return error instead of fatalf

This commit is contained in:
YamasouA 2025-02-28 00:08:07 +09:00
parent f214d8e27a
commit 038b90d475

View File

@ -1194,7 +1194,10 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin
b.Fatalf("workload %s is not valid: %v", w.Name, err)
}
results := runWorkload(tCtx, tc, w, informerFactory)
results, err := runWorkload(tCtx, tc, w, informerFactory)
if err != nil {
tCtx.Fatalf("%w: %s", w.Name, err)
}
dataItems.DataItems = append(dataItems.DataItems, results...)
if len(results) > 0 {
@ -1410,7 +1413,7 @@ func checkEmptyInFlightEvents() error {
return nil
}
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) {
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector, error) {
collectorCtx := ktesting.WithCancel(tCtx)
workloadName := tCtx.Name()
// The first part is the same for each workload, therefore we can strip it.
@ -1421,7 +1424,7 @@ func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup,
collector := collector
err := collector.init()
if err != nil {
tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err)
return nil, nil, fmt.Errorf("op %d: Failed to initialize data collector: %v", opIndex, err)
}
collectorWG.Add(1)
go func() {
@ -1429,12 +1432,12 @@ func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup,
collector.run(collectorCtx)
}()
}
return collectorCtx, collectors
return collectorCtx, collectors, nil
}
func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) []DataItem {
func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) ([]DataItem, error) {
if collectorCtx == nil {
tCtx.Fatalf("op %d: Missing startCollectingMetrics operation before stopping", opIndex)
return nil, fmt.Errorf("op %d: Missing startCollectingMetrics operation before stopping", opIndex)
}
collectorCtx.Cancel("collecting metrics, collector must stop first")
collectorWG.Wait()
@ -1447,7 +1450,7 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex
tCtx.Errorf("op %d: %s", opIndex, err)
}
}
return dataItems
return dataItems, nil
}
type WorkloadExecutor struct {
@ -1465,7 +1468,7 @@ type WorkloadExecutor struct {
nextNodeIndex int
}
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) ([]DataItem, error) {
b, benchmarking := tCtx.TB().(*testing.B)
if benchmarking {
start := time.Now()
@ -1513,70 +1516,74 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
return nil, fmt.Errorf("op %d: %v", opIndex, err)
}
select {
case <-tCtx.Done():
tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
return nil, fmt.Errorf("op %d: %v", opIndex, context.Cause(tCtx))
default:
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
executor.runCreateNodesOp(opIndex, concreteOp)
err = executor.runCreateNodesOp(opIndex, concreteOp)
case *createNamespacesOp:
executor.runCreateNamespaceOp(opIndex, concreteOp)
err = executor.runCreateNamespaceOp(opIndex, concreteOp)
case *createPodsOp:
executor.runCreatePodsOp(opIndex, concreteOp)
err = executor.runCreatePodsOp(opIndex, concreteOp)
case *deletePodsOp:
executor.runDeletePodsOp(opIndex, concreteOp)
err = executor.runDeletePodsOp(opIndex, concreteOp)
case *churnOp:
executor.runChurnOp(opIndex, concreteOp)
err = executor.runChurnOp(opIndex, concreteOp)
case *barrierOp:
executor.runBarrierOp(opIndex, concreteOp)
err = executor.runBarrierOp(opIndex, concreteOp)
case *sleepOp:
executor.runSleepOp(concreteOp)
case *startCollectingMetricsOp:
executor.runStartCollectingMetricsOp(opIndex, concreteOp)
err = executor.runStartCollectingMetricsOp(opIndex, concreteOp)
case *stopCollectingMetricsOp:
executor.runStopCollectingMetrics(opIndex)
err = executor.runStopCollectingMetrics(opIndex)
default:
executor.runDefaultOp(opIndex, concreteOp)
err = executor.runDefaultOp(opIndex, concreteOp)
}
if err != nil {
return nil, err
}
}
// check unused params and inform users
unusedParams := w.unusedParams()
if len(unusedParams) != 0 {
tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
return nil, fmt.Errorf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
}
// Some tests have unschedulable pods. Do not add an implicit barrier at the
// end as we do not want to wait for them.
return executor.dataItems
return executor.dataItems, nil
}
func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) {
func (e *WorkloadExecutor) runCreateNodesOp(opIndex int, op *createNodesOp) error {
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), op, e.tCtx.Client())
if err != nil {
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
if err := nodePreparer.PrepareNodes(e.tCtx, e.nextNodeIndex); err != nil {
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
e.nextNodeIndex += op.Count
return nil
}
func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) {
func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespacesOp) error {
nsPreparer, err := newNamespacePreparer(e.tCtx, op)
if err != nil {
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
if err := nsPreparer.prepare(e.tCtx); err != nil {
err2 := nsPreparer.cleanup(e.tCtx)
if err2 != nil {
err = fmt.Errorf("prepare: %w; cleanup: %w", err, err2)
}
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
for _, n := range nsPreparer.namespaces() {
if _, ok := e.numPodsScheduledPerNamespace[n]; ok {
@ -1585,25 +1592,26 @@ func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespace
}
e.numPodsScheduledPerNamespace[n] = 0
}
return nil
}
func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) {
func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) error {
for _, namespace := range op.Namespaces {
if _, ok := e.numPodsScheduledPerNamespace[namespace]; !ok {
e.tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
return fmt.Errorf("op %d: unknown namespace %s", opIndex, namespace)
}
}
switch op.StageRequirement {
case Attempted:
if err := waitUntilPodsAttempted(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way.
fallthrough
default:
if err := waitUntilPodsScheduled(e.tCtx, e.podInformer, op.LabelSelector, op.Namespaces, e.numPodsScheduledPerNamespace); err != nil {
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on.
@ -1615,6 +1623,7 @@ func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) {
}
}
}
return nil
}
func (e *WorkloadExecutor) runSleepOp(op *sleepOp) {
@ -1624,13 +1633,17 @@ func (e *WorkloadExecutor) runSleepOp(op *sleepOp) {
}
}
func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) {
items := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
func (e *WorkloadExecutor) runStopCollectingMetrics(opIndex int) error {
items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
if err != nil {
return err
}
e.dataItems = append(e.dataItems, items...)
e.collectorCtx = nil
return nil
}
func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) error {
// define Pod's namespace automatically, and create that namespace.
namespace := fmt.Sprintf("namespace-%d", opIndex)
if op.Namespace != nil {
@ -1643,9 +1656,13 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
if op.CollectMetrics {
if e.collectorCtx != nil {
e.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
return fmt.Errorf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
var err error
e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
if err != nil {
return err
}
e.collectorCtx, e.collectors = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
e.tCtx.TB().Cleanup(func() {
if e.collectorCtx != nil {
e.collectorCtx.Cancel("cleaning up")
@ -1653,7 +1670,7 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
})
}
if err := createPodsRapidly(e.tCtx, namespace, op); err != nil {
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
switch {
case op.SkipWaitToCompletion:
@ -1662,29 +1679,33 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
e.numPodsScheduledPerNamespace[namespace] += op.Count
case op.SteadyState:
if err := createPodsSteadily(e.tCtx, namespace, e.podInformer, op); err != nil {
e.tCtx.Fatalf("op %d: %v", opIndex, err)
return fmt.Errorf("op %d: %v", opIndex, err)
}
default:
if err := waitUntilPodsScheduledInNamespace(e.tCtx, e.podInformer, nil, namespace, op.Count); err != nil {
e.tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
return fmt.Errorf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
}
}
if op.CollectMetrics {
// CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been
// scheduled.
items := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
items, err := stopCollectingMetrics(e.tCtx, e.collectorCtx, &e.collectorWG, e.workload.Threshold, *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
if err != nil {
return err
}
e.dataItems = append(e.dataItems, items...)
e.collectorCtx = nil
}
return nil
}
func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) {
func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) error {
labelSelector := labels.ValidatedSetSelector(op.LabelSelector)
podsToDelete, err := e.podInformer.Lister().Pods(op.Namespace).List(labelSelector)
if err != nil {
e.tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err)
return fmt.Errorf("op %d: error in listing pods in the namespace %s: %v", opIndex, op.Namespace, err)
}
deletePods := func(opIndex int) {
@ -1727,9 +1748,10 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) {
} else {
deletePods(opIndex)
}
return nil
}
func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) error {
var namespace string
if op.Namespace != nil {
namespace = *op.Namespace
@ -1740,7 +1762,7 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := e.tCtx.Client().CoreV1().Namespaces().Create(e.tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
e.tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
return fmt.Errorf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}
var churnFns []func(name string) string
@ -1748,12 +1770,12 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
for i, path := range op.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
e.tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
return fmt.Errorf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
e.tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
return fmt.Errorf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
@ -1833,41 +1855,49 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
}
}()
}
return nil
}
func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) {
func (e *WorkloadExecutor) runDefaultOp(opIndex int, op realOp) error {
runable, ok := op.(runnableOp)
if !ok {
e.tCtx.Fatalf("op %d: invalid op %v", opIndex, op)
return fmt.Errorf("op %d: invalid op %v", opIndex, op)
}
for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(e.tCtx, namespace, &e.numPodsScheduledPerNamespace)
}
runable.run(e.tCtx)
return nil
}
func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) {
func (e *WorkloadExecutor) runStartCollectingMetricsOp(opIndex int, op *startCollectingMetricsOp) error {
if e.collectorCtx != nil {
e.tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
return fmt.Errorf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
var err error
e.collectorCtx, e.collectors, err = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector)
if err != nil {
return err
}
e.collectorCtx, e.collectors = startCollectingMetrics(e.tCtx, &e.collectorWG, e.podInformer, e.testCase.MetricsCollectorConfig, e.throughputErrorMargin, opIndex, op.Name, op.Namespaces, op.LabelSelector)
e.tCtx.TB().Cleanup(func() {
if e.collectorCtx != nil {
e.collectorCtx.Cancel("cleaning up")
}
})
return nil
}
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) error {
if _, ok := (*podsPerNamespace)[namespace]; !ok {
// The namespace has not created yet.
// So, create that and register it.
_, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
if err != nil {
tCtx.Fatalf("failed to create namespace for Pod: %v", namespace)
return fmt.Errorf("failed to create namespace for Pod: %v", namespace)
}
(*podsPerNamespace)[namespace] = 0
}
return nil
}
type testDataCollector interface {