diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index c8aa46f5cb9..619260cb6ee 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -5287,7 +5287,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) manager.queue.Add(testutil.GetKey(job, t)) - manager.processNextWorkItem(context.TODO()) + manager.processNextWorkItem(ctx) if tc.wantRequeued { verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1) } else { @@ -5297,7 +5297,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { // into the queue asynchronously. manager.clock.Sleep(fastJobApiBackoff) time.Sleep(time.Millisecond) - verifyEmptyQueue(ctx, t, manager) + verifyEmptyQueue(t, manager) } }) } @@ -5570,7 +5570,7 @@ func TestGetPodsForJob(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(p) } - pods, err := jm.getPodsForJob(context.TODO(), job) + pods, err := jm.getPodsForJob(ctx, job) if err != nil { t.Fatalf("getPodsForJob() error: %v", err) } @@ -5961,7 +5961,7 @@ func TestWatchJobs(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) sharedInformerFactory.Start(stopCh) - go manager.Run(context.TODO(), 1) + go manager.Run(ctx, 1) // We're sending new job to see if it reaches syncHandler. testJob.Namespace = "bar" @@ -6008,7 +6008,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh) - go manager.Run(context.TODO(), 1) + go manager.Run(ctx, 1) pods := newPodList(1, v1.PodRunning, testJob) testPod := pods[0] @@ -6035,7 +6035,7 @@ func TestWatchOrphanPods(t *testing.T) { podInformer := sharedInformers.Core().V1().Pods().Informer() go podInformer.Run(stopCh) cache.WaitForCacheSync(stopCh, podInformer.HasSynced) - go manager.Run(context.TODO(), 1) + go manager.Run(ctx, 1) // Create job but don't add it to the store. cases := map[string]struct { @@ -6297,7 +6297,7 @@ func TestJobApiBackoffReset(t *testing.T) { // error returned make the key requeued fakePodControl.Err = errors.New("Controller error") manager.queue.Add(key) - manager.processNextWorkItem(context.TODO()) + manager.processNextWorkItem(ctx) retries := manager.queue.NumRequeues(key) if retries != 1 { t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries) @@ -6307,8 +6307,8 @@ func TestJobApiBackoffReset(t *testing.T) { // the queue is emptied on success fakePodControl.Err = nil - manager.processNextWorkItem(context.TODO()) - verifyEmptyQueue(ctx, t, manager) + manager.processNextWorkItem(ctx) + verifyEmptyQueue(t, manager) } var _ workqueue.TypedRateLimitingInterface[string] = &fakeRateLimitingQueue{} @@ -7091,13 +7091,13 @@ func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPol func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { t.Helper() - verifyEmptyQueue(ctx, t, jm) + verifyEmptyQueue(t, jm) awaitForQueueLen(ctx, t, jm, wantQueueLen) } func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { t.Helper() - verifyEmptyQueue(ctx, t, jm) + verifyEmptyQueue(t, jm) if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) { if requeued := jm.queue.Len() == wantQueueLen; requeued { return true, nil @@ -7109,7 +7109,7 @@ func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQue } } -func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) { +func verifyEmptyQueue(t *testing.T, jm *Controller) { t.Helper() if jm.queue.Len() > 0 { t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len()) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 427e62b466c..cefd59195e5 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -83,7 +83,7 @@ func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *baseme cmpErr = nil value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...)) if err != nil { - return true, fmt.Errorf("collecting the %q metric: %q", counterVec.Name, err) + return true, fmt.Errorf("collecting the %q metric: %w", counterVec.Name, err) } if wantMetric.Value != int(value) { cmpErr = fmt.Errorf("Unexpected metric delta for %q metric with labels %q. want: %v, got: %v", counterVec.Name, wantMetric.Labels, wantMetric.Value, int(value)) @@ -92,7 +92,7 @@ func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *baseme return true, nil }) if err != nil { - t.Errorf("Failed waiting for expected metric: %q", err) + t.Errorf("Failed waiting for expected metric: %v", err) } if cmpErr != nil { t.Error(cmpErr) @@ -471,8 +471,8 @@ func TestJobPodFailurePolicy(t *testing.T) { return true } - if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil { - t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name) + if _, err := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil { + t.Fatalf("Error %q while updating pod status for Job: %v", err, jobObj.Name) } if test.restartController { @@ -488,8 +488,8 @@ func TestJobPodFailurePolicy(t *testing.T) { }) if test.wantJobConditionType == batchv1.JobComplete { - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { - t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err) + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err) } } validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType) @@ -1034,8 +1034,8 @@ func TestBackoffLimitPerIndex_Reenabling(t *testing.T) { validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To("2")) // mark remaining pods are Succeeded and verify Job status - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { - t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err) + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err) } validateJobFailed(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) @@ -1529,11 +1529,11 @@ func TestBackoffLimitPerIndex(t *testing.T) { for _, podTermination := range test.podTerminations { pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index) if err != nil { - t.Fatalf("listing Job Pods: %q", err) + t.Fatalf("listing Job Pods: %v", err) } pod.Status = podTermination.status if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil { - t.Fatalf("Error updating the pod %q: %q", klog.KObj(pod), err) + t.Fatalf("Error updating the pod %q: %v", klog.KObj(pod), err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: podTermination.wantActive, @@ -1560,8 +1560,8 @@ func TestBackoffLimitPerIndex(t *testing.T) { remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive if remainingActive > 0 { - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil { - t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err) + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err) } } validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType) @@ -1789,7 +1789,7 @@ func TestManagedBy_Reenabling(t *testing.T) { ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) // Marking the pod as finished, but it does not result in updating of the Job status. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj)) } @@ -1855,7 +1855,7 @@ func TestManagedBy_RecreatedJob(t *testing.T) { }) // Marking the pod as complete queues the job reconciliation - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj)) } @@ -2053,7 +2053,7 @@ func TestNonParallelJob(t *testing.T) { ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) // Failed Pod is replaced. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -2072,7 +2072,7 @@ func TestNonParallelJob(t *testing.T) { ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) // No more Pods are created after the Pod succeeds. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobComplete(ctx, t, clientSet, jobObj) @@ -2117,14 +2117,14 @@ func TestParallelJob(t *testing.T) { validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // Tracks ready pods, if enabled. - if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil { + if _, err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil { t.Fatalf("Failed Marking Pods as ready: %v", err) } want.Ready = ptr.To[int32](2) validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // Failed Pods are replaced. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } want = podsByStatus{ @@ -2135,7 +2135,7 @@ func TestParallelJob(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // Once one Pod succeeds, no more Pods are created, even if some fail. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } want = podsByStatus{ @@ -2146,7 +2146,7 @@ func TestParallelJob(t *testing.T) { Terminating: ptr.To[int32](0), } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } want = podsByStatus{ @@ -2158,7 +2158,7 @@ func TestParallelJob(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // No more Pods are created after remaining Pods succeed. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) } validateJobComplete(ctx, t, clientSet, jobObj) @@ -2231,8 +2231,8 @@ func TestParallelJobChangingParallelism(t *testing.T) { }) // Succeed Job - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil { - t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil { + t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) } validateJobComplete(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -2271,14 +2271,14 @@ func TestParallelJobWithCompletions(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // Tracks ready pods, if enabled. - if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil { + if _, err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil { t.Fatalf("Failed Marking Pods as ready: %v", err) } want.Ready = ptr.To[int32](52) validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // Failed Pods are replaced. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } want = podsByStatus{ @@ -2289,7 +2289,7 @@ func TestParallelJobWithCompletions(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // Pods are created until the number of succeeded Pods equals completions. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } want = podsByStatus{ @@ -2301,7 +2301,7 @@ func TestParallelJobWithCompletions(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) // No more Pods are created after the Job completes. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) } validateJobComplete(ctx, t, clientSet, jobObj) @@ -2391,7 +2391,7 @@ func TestIndexedJob(t *testing.T) { }) // Remaining Pods succeed. - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { t.Fatal("Failed trying to succeed remaining pods") } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -2993,7 +2993,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) { }) remaining := int(tc.nPods) if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { - if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { + if succ, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { remaining -= succ b.Logf("Transient failure succeeding pods: %v", err) return false, nil @@ -3086,7 +3086,7 @@ func BenchmarkLargeFailureHandling(b *testing.B) { b.StartTimer() remaining := int(tc.nPods) if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { - if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil { + if fail, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil { remaining -= fail b.Logf("Transient failure failing pods: %v", err) return false, nil @@ -3205,7 +3205,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { // Fail a pod ASAP. err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { return false, nil } return true, nil @@ -3241,7 +3241,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { }) // Fail the first pod - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -3252,7 +3252,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { }) // Fail the second pod - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -3341,12 +3341,12 @@ func TestJobFailedWithInterrupts(t *testing.T) { Terminating: ptr.To[int32](0), }) t.Log("Finishing pods") - if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { t.Fatalf("Could not fail a pod: %v", err) } remaining := 9 if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { - if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { + if succ, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { remaining -= succ t.Logf("Transient failure succeeding pods: %v", err) return false, nil @@ -3776,7 +3776,7 @@ func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset } } -func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) { +func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (int, error) { op := func(p *v1.Pod) bool { p.Status.Phase = phase if phase == v1.PodFailed || phase == v1.PodSucceeded { @@ -3795,7 +3795,7 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt) } -func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) { +func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (int, error) { op := func(p *v1.Pod) bool { if podutil.IsPodReady(p) { return false @@ -3809,10 +3809,10 @@ func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt) } -func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) { +func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (int, error) { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { - return fmt.Errorf("listing Job Pods: %w", err), 0 + return 0, fmt.Errorf("listing Job Pods: %w", err) } updates := make([]v1.Pod, 0, cnt) for _, pod := range pods.Items { @@ -3828,9 +3828,9 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job } successful, err := updatePodStatuses(ctx, clientSet, updates) if successful != cnt { - return fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful), successful + return successful, fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful) } - return err, successful + return successful, err } func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) {