use *e.tCtx

This commit is contained in:
YamasouA 2025-02-13 23:33:13 +09:00
parent cc87cb54ab
commit a9ee6bdf81

View File

@ -1579,29 +1579,27 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
} }
func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) { func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) {
tCtx := *e.tCtx nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, (*e.tCtx).Client())
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, tCtx.Client())
if err != nil { if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
if err := nodePreparer.PrepareNodes((*e.tCtx), e.nextNodeIndex); err != nil { if err := nodePreparer.PrepareNodes(*e.tCtx, e.nextNodeIndex); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
e.nextNodeIndex += op.Count e.nextNodeIndex += op.Count
} }
func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) { func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) {
tCtx := *e.tCtx nsPreparer, err := newNamespacePreparer(*e.tCtx, op)
nsPreparer, err := newNamespacePreparer(tCtx, op)
if err != nil { if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
if err := nsPreparer.prepare(tCtx); err != nil { if err := nsPreparer.prepare(*e.tCtx); err != nil {
err2 := nsPreparer.cleanup(tCtx) err2 := nsPreparer.cleanup(*e.tCtx)
if err2 != nil { if err2 != nil {
err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
} }
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
for _, n := range nsPreparer.namespaces() { for _, n := range nsPreparer.namespaces() {
if _, ok := e.numPodsScheduledPerNamespace[n]; ok { if _, ok := e.numPodsScheduledPerNamespace[n]; ok {
@ -1613,23 +1611,22 @@ func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespace
} }
func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) { func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) {
tCtx := *e.tCtx
for _, namespace := range op.Namespaces { for _, namespace := range op.Namespaces {
if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok { if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok {
tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) (*e.tCtx).Fatalf("op %d: unknown namespace %s", opIndex, namespace)
} }
} }
switch op.StageRequirement { switch op.StageRequirement {
case Attempted: case Attempted:
if err := waitUntilPodsAttempted(tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { if err := waitUntilPodsAttempted(*e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
case Scheduled: case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way. // Default should be treated like "Scheduled", so handling both in the same way.
fallthrough fallthrough
default: default:
if err := waitUntilPodsScheduled(tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil { if err := waitUntilPodsScheduled(*e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
// At the end of the barrier, we can be sure that there are no pods // At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on. // pending scheduling in the namespaces that we just blocked on.
@ -1644,34 +1641,31 @@ func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) {
} }
func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) { func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) {
tCtx := *e.tCtx
collectorCtx := *e.collectorCtx collectorCtx := *e.collectorCtx
items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) items := stopCollectingMetrics(*e.tCtx, collectorCtx, e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
e.dataItems = append(e.dataItems, items...) e.dataItems = append(e.dataItems, items...)
collectorCtx = nil collectorCtx = nil
} }
func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) { func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
tCtx := *e.tCtx
collectorCtx := *e.tCtx
// define Pod's namespace automatically, and create that namespace. // define Pod's namespace automatically, and create that namespace.
namespace := fmt.Sprintf("namespace-%d", opIndex) namespace := fmt.Sprintf("namespace-%d", opIndex)
if op.Namespace != nil { if op.Namespace != nil {
namespace = *op.Namespace namespace = *op.Namespace
} }
createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace) createNamespaceIfNotPresent(*e.tCtx, namespace, &e.numPodsScheduledPerNamespace)
if op.PodTemplatePath == nil { if op.PodTemplatePath == nil {
op.PodTemplatePath = e.testCase.DefaultPodTemplatePath op.PodTemplatePath = e.testCase.DefaultPodTemplatePath
} }
if op.CollectMetrics { if op.CollectMetrics {
if *e.collectorCtx != nil { if *e.collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) (*e.tCtx).Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
} }
*e.collectorCtx, e.collectors = startCollectingMetrics(tCtx, e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil) *e.collectorCtx, e.collectors = startCollectingMetrics(*e.tCtx, e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
} }
if err := createPodsRapidly(tCtx, namespace, op); err != nil { if err := createPodsRapidly(*e.tCtx, namespace, op); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
switch { switch {
case op.SkipWaitToCompletion: case op.SkipWaitToCompletion:
@ -1679,31 +1673,30 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
// in the future. // in the future.
e.numPodsScheduledPerNamespace[namespace] += op.Count e.numPodsScheduledPerNamespace[namespace] += op.Count
case op.SteadyState: case op.SteadyState:
if err := createPodsSteadily(tCtx, namespace, e.podInformer, op); err != nil { if err := createPodsSteadily(*e.tCtx, namespace, e.podInformer, op); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: %v", opIndex, err)
} }
default: default:
if err := waitUntilPodsScheduledInNamespace(tCtx, e.podInformer, nil, namespace, op.Count); err != nil { if err := waitUntilPodsScheduledInNamespace(*e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil {
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) (*e.tCtx).Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
} }
} }
if op.CollectMetrics { if op.CollectMetrics {
// CollectMetrics and SkipWaitToCompletion can never be true at the // CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been // same time, so if we're here, it means that all pods have been
// scheduled. // scheduled.
items := stopCollectingMetrics(tCtx, collectorCtx, e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors) items := stopCollectingMetrics((*e.tCtx), (*e.collectorCtx), e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
e.dataItems = append(e.dataItems, items...) e.dataItems = append(e.dataItems, items...)
*e.collectorCtx = nil *e.collectorCtx = nil
} }
} }
func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) { func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) {
tCtx := *e.tCtx
labelSelector := labels.ValidatedSetSelector(op.LabelSelector) labelSelector := labels.ValidatedSetSelector(op.LabelSelector)
podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector) podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector)
if err != nil { if err != nil {
tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err) (*e.tCtx).Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err)
} }
deletePods := func(opIndex int) { deletePods := func(opIndex int) {
@ -1714,13 +1707,13 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) {
for i := 0; i < len(podsToDelete); i++ { for i := 0; i < len(podsToDelete); i++ {
select { select {
case <-ticker.C: case <-ticker.C:
if err := tCtx.Client().CoreV1().Pods(op.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { if err := (*e.tCtx).Client().CoreV1().Pods(op.Namespace).Delete(*e.tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
return return
} }
tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) (*e.tCtx).Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err)
} }
case <-tCtx.Done(): case <-(*e.tCtx).Done():
return return
} }
} }
@ -1729,11 +1722,11 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) {
listOpts := metav1.ListOptions{ listOpts := metav1.ListOptions{
LabelSelector: labelSelector.String(), LabelSelector: labelSelector.String(),
} }
if err := tCtx.Client().CoreV1().Pods(op.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil { if err := (*e.tCtx).Client().CoreV1().Pods(op.Namespace).DeleteCollection(*e.tCtx, metav1.DeleteOptions{}, listOpts); err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
return return
} }
tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err) (*e.tCtx).Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, op.Namespace, err)
} }
} }
@ -1749,18 +1742,17 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) {
} }
func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) { func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
tCtx := *e.tCtx
var namespace string var namespace string
if op.Namespace != nil { if op.Namespace != nil {
namespace = *op.Namespace namespace = *op.Namespace
} else { } else {
namespace = fmt.Sprintf("namespace-%d", opIndex) namespace = fmt.Sprintf("namespace-%d", opIndex)
} }
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery())) restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient((*e.tCtx).Client().Discovery()))
// Ensure the namespace exists. // Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { if _, err := (*e.tCtx).Client().CoreV1().Namespaces().Create(*e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) (*e.tCtx).Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
} }
var churnFns []func(name string) string var churnFns []func(name string) string
@ -1768,31 +1760,31 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
for i, path := range op.TemplatePaths { for i, path := range op.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path) unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil { if err != nil {
tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) (*e.tCtx).Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
} }
// Obtain GVR. // Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil { if err != nil {
tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) (*e.tCtx).Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
} }
gvr := mapping.Resource gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects. // Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace { if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace) dynRes = (*e.tCtx).Dynamic().Resource(gvr).Namespace(namespace)
} else { } else {
dynRes = tCtx.Dynamic().Resource(gvr) dynRes = (*e.tCtx).Dynamic().Resource(gvr)
} }
churnFns = append(churnFns, func(name string) string { churnFns = append(churnFns, func(name string) string {
if name != "" { if name != "" {
if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) { if err := dynRes.Delete(*e.tCtx, name, metav1.DeleteOptions{}); err != nil && !errors.Is(err, context.Canceled) {
tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) (*e.tCtx).Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
} }
return "" return ""
} }
live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{}) live, err := dynRes.Create(*e.tCtx, unstructuredObj, metav1.CreateOptions{})
if err != nil { if err != nil {
return "" return ""
} }
@ -1823,7 +1815,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
churnFns[i]("") churnFns[i]("")
} }
count++ count++
case <-tCtx.Done(): case <-(*e.tCtx).Done():
return return
} }
} }
@ -1847,7 +1839,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
retVals[i][count%op.Number] = churnFns[i](retVals[i][count%op.Number]) retVals[i][count%op.Number] = churnFns[i](retVals[i][count%op.Number])
} }
count++ count++
case <-tCtx.Done(): case <-(*e.tCtx).Done():
return return
} }
} }
@ -1856,15 +1848,14 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
} }
func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) { func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) {
tCtx := *e.tCtx
runable, ok := op.(runnableOp) runable, ok := op.(runnableOp)
if !ok { if !ok {
tCtx.Fatalf("op %d: invalid op %v", opIndex, op) (*e.tCtx).Fatalf("op %d: invalid op %v", opIndex, op)
} }
for _, namespace := range runable.requiredNamespaces() { for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(tCtx, namespace, &e.numPodsScheduledPerNamespace) createNamespaceIfNotPresent(*e.tCtx, namespace, &e.numPodsScheduledPerNamespace)
} }
runable.run(tCtx) runable.run(*e.tCtx)
} }
func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) { func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) {