refactor runWorkloads

This commit is contained in:
YamasouA 2025-01-26 19:39:38 +09:00
parent 8f8c94a04d
commit 659804b765

View File

@ -1450,6 +1450,40 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex
return dataItems
}
type MetricsCollectionData struct {
Collectors []testDataCollector
// This needs a separate context and wait group because
// the metrics collecting needs to be sure that the goroutines
// are stopped.
CollectorCtx ktesting.TContext
CollectorWG *sync.WaitGroup
// Disable error checking of the sampling interval length in the
// throughput collector by default. When running benchmarks, report
// it as test failure when samples are not taken regularly.
ThroughputErrorMargin float64
}
type WorkloadState struct {
DataItems []DataItem
NextNodeIndex int
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
NumPodsScheduledPerNamespace map[string]int
}
type SharedOperationData struct {
// Additional informers needed for testing. The pod informer was
// already created before (scheduler.NewInformerFactory) and the
// factory was started for it (mustSetupCluster), therefore we don't
// need to start again.
PodInformer coreinformers.PodInformer
MetricsData *MetricsCollectionData
WorkloadState *WorkloadState
TCtx ktesting.TContext
WG sync.WaitGroup
CancelFunc context.CancelFunc
}
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
b, benchmarking := tCtx.TB().(*testing.B)
if benchmarking {
@ -1463,9 +1497,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
})
}
// Disable error checking of the sampling interval length in the
// throughput collector by default. When running benchmarks, report
// it as test failure when samples are not taken regularly.
var throughputErrorMargin float64
if benchmarking {
// TODO: To prevent the perf-test failure, we increased the error margin, if still not enough
@ -1473,12 +1504,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
throughputErrorMargin = 30
}
// Additional informers needed for testing. The pod informer was
// already created before (scheduler.NewInformerFactory) and the
// factory was started for it (mustSetupCluster), therefore we don't
// need to start again.
podInformer := informerFactory.Core().V1().Pods()
// Everything else started by this function gets stopped before it returns.
tCtx = ktesting.WithCancel(tCtx)
var wg sync.WaitGroup
@ -1486,315 +1511,25 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
defer tCtx.Cancel("workload is done")
var dataItems []DataItem
nextNodeIndex := 0
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
numPodsScheduledPerNamespace := make(map[string]int)
var collectors []testDataCollector
// This needs a separate context and wait group because
// the metrics collecting needs to be sure that the goroutines
// are stopped.
var collectorCtx ktesting.TContext
var collectorWG sync.WaitGroup
defer collectorWG.Wait()
sharedOperationData := SharedOperationData{
TCtx: tCtx,
WG: wg,
MetricsData: &MetricsCollectionData{
CollectorWG: &sync.WaitGroup{},
ThroughputErrorMargin: throughputErrorMargin,
},
WorkloadState: &WorkloadState{
NumPodsScheduledPerNamespace: make(map[string]int),
},
PodInformer: informerFactory.Core().V1().Pods(),
}
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
select {
case <-tCtx.Done():
tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
default:
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client())
if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
nextNodeIndex += concreteOp.Count
case *createNamespacesOp:
nsPreparer, err := newNamespacePreparer(tCtx, concreteOp)
if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
if err := nsPreparer.prepare(tCtx); err != nil {
err2 := nsPreparer.cleanup(tCtx)
if err2 != nil {
err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
}
tCtx.Fatalf("op %d: %v", opIndex, err)
}
for _, n := range nsPreparer.namespaces() {
if _, ok := numPodsScheduledPerNamespace[n]; ok {
// this namespace has been already created.
continue
}
numPodsScheduledPerNamespace[n] = 0
}
case *createPodsOp:
var namespace string
// define Pod's namespace automatically, and create that namespace.
namespace = fmt.Sprintf("namespace-%d", opIndex)
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
}
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
}
if concreteOp.CollectMetrics {
if collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
defer collectorCtx.Cancel("cleaning up")
}
if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
switch {
case concreteOp.SkipWaitToCompletion:
// Only record those namespaces that may potentially require barriers
// in the future.
numPodsScheduledPerNamespace[namespace] += concreteOp.Count
case concreteOp.SteadyState:
if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
default:
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil {
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
}
}
if concreteOp.CollectMetrics {
// CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been
// scheduled.
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
dataItems = append(dataItems, items...)
collectorCtx = nil
}
case *deletePodsOp:
labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector)
podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector)
if err != nil {
tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err)
}
deletePods := func(opIndex int) {
if concreteOp.DeletePodsPerSecond > 0 {
ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond))
defer ticker.Stop()
for i := 0; i < len(podsToDelete); i++ {
select {
case <-ticker.C:
if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil {
if errors.Is(err, context.Canceled) {
return
}
tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err)
}
case <-tCtx.Done():
return
}
}
return
}
listOpts := metav1.ListOptions{
LabelSelector: labelSelector.String(),
}
if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil {
if errors.Is(err, context.Canceled) {
return
}
tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err)
}
}
if concreteOp.SkipWaitToCompletion {
wg.Add(1)
go func(opIndex int) {
defer wg.Done()
deletePods(opIndex)
}(opIndex)
} else {
deletePods(opIndex)
}
case *churnOp:
var namespace string
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
} else {
namespace = fmt.Sprintf("namespace-%d", opIndex)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery()))
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}
var churnFns []func(name string) string
for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace)
} else {
dynRes = tCtx.Dynamic().Resource(gvr)
}
churnFns = append(churnFns, func(name string) string {
if name != "" {
if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) {
tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
}
return ""
}
live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{})
if err != nil {
return ""
}
return live.GetName()
})
}
var interval int64 = 500
if concreteOp.IntervalMilliseconds != 0 {
interval = concreteOp.IntervalMilliseconds
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
switch concreteOp.Mode {
case Create:
wg.Add(1)
go func() {
defer wg.Done()
count, threshold := 0, concreteOp.Number
if threshold == 0 {
threshold = math.MaxInt32
}
for count < threshold {
select {
case <-ticker.C:
for i := range churnFns {
churnFns[i]("")
}
count++
case <-tCtx.Done():
return
}
}
}()
case Recreate:
wg.Add(1)
go func() {
defer wg.Done()
retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals {
retVals[i] = make([]string, concreteOp.Number)
}
count := 0
for {
select {
case <-ticker.C:
for i := range churnFns {
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
}
count++
case <-tCtx.Done():
return
}
}
}()
}
case *barrierOp:
for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
}
}
switch concreteOp.StageRequirement {
case Attempted:
if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way.
fallthrough
default:
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on.
if len(concreteOp.Namespaces) == 0 {
numPodsScheduledPerNamespace = make(map[string]int)
} else {
for _, namespace := range concreteOp.Namespaces {
delete(numPodsScheduledPerNamespace, namespace)
}
}
}
case *sleepOp:
select {
case <-tCtx.Done():
case <-time.After(concreteOp.Duration.Duration):
}
case *startCollectingMetricsOp:
if collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector)
defer collectorCtx.Cancel("cleaning up")
case *stopCollectingMetricsOp:
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
dataItems = append(dataItems, items...)
collectorCtx = nil
default:
runable, ok := concreteOp.(runnableOp)
if !ok {
tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
}
for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
}
runable.run(tCtx)
}
runOperation(tc, opIndex, op, w, &sharedOperationData)
}
// check unused params and inform users
@ -1808,6 +1543,336 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
return dataItems
}
func runCreateNodesOp(opIndex int, concreteOp *createNodesOp, sharedOperationData *SharedOperationData) {
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, sharedOperationData.TCtx.Client())
if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
if err := nodePreparer.PrepareNodes(sharedOperationData.TCtx, sharedOperationData.WorkloadState.NextNodeIndex); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
sharedOperationData.WorkloadState.NextNodeIndex += concreteOp.Count
}
func runCreateNamespacesOp(opIndex int, concreteOp *createNamespacesOp, sharedOperationData *SharedOperationData) {
nsPreparer, err := newNamespacePreparer(sharedOperationData.TCtx, concreteOp)
if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
if err := nsPreparer.prepare(sharedOperationData.TCtx); err != nil {
err2 := nsPreparer.cleanup(sharedOperationData.TCtx)
if err2 != nil {
err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
}
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
for _, n := range nsPreparer.namespaces() {
if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n]; ok {
// this namespace has been already created.
continue
}
sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[n] = 0
}
}
func runCreatePodsOp(tc *testCase, w *workload, opIndex int, concreteOp *createPodsOp, sharedOperationData *SharedOperationData) {
var namespace string
// define Pod's namespace automatically, and create that namespace.
namespace = fmt.Sprintf("namespace-%d", opIndex)
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
}
createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace)
if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
}
if concreteOp.CollectMetrics {
if sharedOperationData.MetricsData.CollectorCtx != nil {
sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up")
}
if err := createPodsRapidly(sharedOperationData.TCtx, namespace, concreteOp); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
switch {
case concreteOp.SkipWaitToCompletion:
// Only record those namespaces that may potentially require barriers
// in the future.
sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace] += concreteOp.Count
case concreteOp.SteadyState:
if err := createPodsSteadily(sharedOperationData.TCtx, namespace, sharedOperationData.PodInformer, concreteOp); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
default:
if err := waitUntilPodsScheduledInNamespace(sharedOperationData.TCtx, sharedOperationData.PodInformer, nil, namespace, concreteOp.Count); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
}
}
if concreteOp.CollectMetrics {
// CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been
// scheduled.
items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors)
sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...)
sharedOperationData.MetricsData.CollectorCtx = nil
}
}
func runDeletePodsOp(opIndex int, concreteOp *deletePodsOp, sharedOperationData *SharedOperationData) {
labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector)
podsToDelete, err := sharedOperationData.PodInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector)
if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err)
}
deletePods := func(opIndex int) {
if concreteOp.DeletePodsPerSecond > 0 {
ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond))
defer ticker.Stop()
for i := 0; i < len(podsToDelete); i++ {
select {
case <-ticker.C:
if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(sharedOperationData.TCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil {
if errors.Is(err, context.Canceled) {
return
}
sharedOperationData.TCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err)
}
case <-sharedOperationData.TCtx.Done():
return
}
}
return
}
listOpts := metav1.ListOptions{
LabelSelector: labelSelector.String(),
}
if err := sharedOperationData.TCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(sharedOperationData.TCtx, metav1.DeleteOptions{}, listOpts); err != nil {
if errors.Is(err, context.Canceled) {
return
}
sharedOperationData.TCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err)
}
}
if concreteOp.SkipWaitToCompletion {
sharedOperationData.WG.Add(1)
go func(opIndex int) {
defer sharedOperationData.WG.Done()
deletePods(opIndex)
}(opIndex)
} else {
deletePods(opIndex)
}
}
func runChurnOp(opIndex int, concreteOp *churnOp, sharedOperationData *SharedOperationData) {
var namespace string
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
} else {
namespace = fmt.Sprintf("namespace-%d", opIndex)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(sharedOperationData.TCtx.Client().Discovery()))
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := sharedOperationData.TCtx.Client().CoreV1().Namespaces().Create(sharedOperationData.TCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
sharedOperationData.TCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}
var churnFns []func(name string) string
for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr).Namespace(namespace)
} else {
dynRes = sharedOperationData.TCtx.Dynamic().Resource(gvr)
}
churnFns = append(churnFns, func(name string) string {
if name != "" {
if err := dynRes.Delete(sharedOperationData.TCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) {
sharedOperationData.TCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
}
return ""
}
live, err := dynRes.Create(sharedOperationData.TCtx, unstructuredObj, metav1.CreateOptions{})
if err != nil {
return ""
}
return live.GetName()
})
}
var interval int64 = 500
if concreteOp.IntervalMilliseconds != 0 {
interval = concreteOp.IntervalMilliseconds
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
switch concreteOp.Mode {
case Create:
sharedOperationData.WG.Add(1)
go func() {
defer sharedOperationData.WG.Done()
count, threshold := 0, concreteOp.Number
if threshold == 0 {
threshold = math.MaxInt32
}
for count < threshold {
select {
case <-ticker.C:
for i := range churnFns {
churnFns[i]("")
}
count++
case <-sharedOperationData.TCtx.Done():
return
}
}
}()
case Recreate:
sharedOperationData.WG.Add(1)
go func() {
defer sharedOperationData.WG.Done()
retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals {
retVals[i] = make([]string, concreteOp.Number)
}
count := 0
for {
select {
case <-ticker.C:
for i := range churnFns {
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
}
count++
case <-sharedOperationData.TCtx.Done():
return
}
}
}()
}
}
func runBarrierOp(opIndex int, concreteOp *barrierOp, sharedOperationData *SharedOperationData) {
for _, namespace := range concreteOp.Namespaces {
if _, ok := sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace[namespace]; !ok {
sharedOperationData.TCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
}
}
switch concreteOp.StageRequirement {
case Attempted:
if err := waitUntilPodsAttempted(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way.
fallthrough
default:
if err := waitUntilPodsScheduled(sharedOperationData.TCtx, sharedOperationData.PodInformer, concreteOp.LabelSelector, concreteOp.Namespaces, sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace); err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on.
if len(concreteOp.Namespaces) == 0 {
sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace = make(map[string]int)
} else {
for _, namespace := range concreteOp.Namespaces {
delete(sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace, namespace)
}
}
}
}
func runSleepOp(concreteOp *sleepOp, sharedOperationData *SharedOperationData) {
select {
case <-sharedOperationData.TCtx.Done():
case <-time.After(concreteOp.Duration.Duration):
}
}
func runStartCollectingMetricsOp(opIndex int, tc *testCase, concreteOp *startCollectingMetricsOp, sharedOperationData *SharedOperationData) {
if sharedOperationData.MetricsData.CollectorCtx != nil {
sharedOperationData.TCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.Collectors = startCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorWG, sharedOperationData.PodInformer, tc.MetricsCollectorConfig, sharedOperationData.MetricsData.ThroughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector)
defer sharedOperationData.MetricsData.CollectorCtx.Cancel("cleaning up")
}
func runStopCollectingMetricsOp(opIndex int, w *workload, sharedOperationData *SharedOperationData) {
items := stopCollectingMetrics(sharedOperationData.TCtx, sharedOperationData.MetricsData.CollectorCtx, sharedOperationData.MetricsData.CollectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, sharedOperationData.MetricsData.Collectors)
sharedOperationData.WorkloadState.DataItems = append(sharedOperationData.WorkloadState.DataItems, items...)
sharedOperationData.MetricsData.CollectorCtx = nil
}
func runDefault(opIndex int, concreteOp realOp, sharedOperationData *SharedOperationData) {
runable, ok := concreteOp.(runnableOp)
if !ok {
sharedOperationData.TCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
}
for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(sharedOperationData.TCtx, namespace, &sharedOperationData.WorkloadState.NumPodsScheduledPerNamespace)
}
runable.run(sharedOperationData.TCtx)
}
func runOperation(tc *testCase, opIndex int, op op, w *workload, sharedOperationData *SharedOperationData) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, err)
}
select {
case <-sharedOperationData.TCtx.Done():
sharedOperationData.TCtx.Fatalf("op %d: %v", opIndex, context.Cause(sharedOperationData.TCtx))
default:
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
runCreateNodesOp(opIndex, concreteOp, sharedOperationData)
case *createNamespacesOp:
runCreateNamespacesOp(opIndex, concreteOp, sharedOperationData)
case *createPodsOp:
runCreatePodsOp(tc, w, opIndex, concreteOp, sharedOperationData)
case *deletePodsOp:
runDeletePodsOp(opIndex, concreteOp, sharedOperationData)
case *churnOp:
runChurnOp(opIndex, concreteOp, sharedOperationData)
case *barrierOp:
runBarrierOp(opIndex, concreteOp, sharedOperationData)
case *sleepOp:
runSleepOp(concreteOp, sharedOperationData)
case *startCollectingMetricsOp:
runStartCollectingMetricsOp(opIndex, tc, concreteOp, sharedOperationData)
case *stopCollectingMetricsOp:
runStopCollectingMetricsOp(opIndex, w, sharedOperationData)
default:
runDefault(opIndex, concreteOp, sharedOperationData)
}
}
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
if _, ok := (*podsPerNamespace)[namespace]; !ok {
// The namespace has not created yet.