Cleanup Job controller tests

This commit is contained in:
Michal Wozniak 2024-07-05 14:59:03 +02:00
parent 58c44005cd
commit 4250d444f8
2 changed files with 53 additions and 53 deletions

View File

@ -5287,7 +5287,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
job := newJob(2, 2, 6, batch.NonIndexedCompletion)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
manager.queue.Add(testutil.GetKey(job, t))
manager.processNextWorkItem(context.TODO())
manager.processNextWorkItem(ctx)
if tc.wantRequeued {
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1)
} else {
@ -5297,7 +5297,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
// into the queue asynchronously.
manager.clock.Sleep(fastJobApiBackoff)
time.Sleep(time.Millisecond)
verifyEmptyQueue(ctx, t, manager)
verifyEmptyQueue(t, manager)
}
})
}
@ -5570,7 +5570,7 @@ func TestGetPodsForJob(t *testing.T) {
informer.Core().V1().Pods().Informer().GetIndexer().Add(p)
}
pods, err := jm.getPodsForJob(context.TODO(), job)
pods, err := jm.getPodsForJob(ctx, job)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
@ -5961,7 +5961,7 @@ func TestWatchJobs(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformerFactory.Start(stopCh)
go manager.Run(context.TODO(), 1)
go manager.Run(ctx, 1)
// We're sending new job to see if it reaches syncHandler.
testJob.Namespace = "bar"
@ -6008,7 +6008,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh)
go manager.Run(context.TODO(), 1)
go manager.Run(ctx, 1)
pods := newPodList(1, v1.PodRunning, testJob)
testPod := pods[0]
@ -6035,7 +6035,7 @@ func TestWatchOrphanPods(t *testing.T) {
podInformer := sharedInformers.Core().V1().Pods().Informer()
go podInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
go manager.Run(context.TODO(), 1)
go manager.Run(ctx, 1)
// Create job but don't add it to the store.
cases := map[string]struct {
@ -6297,7 +6297,7 @@ func TestJobApiBackoffReset(t *testing.T) {
// error returned make the key requeued
fakePodControl.Err = errors.New("Controller error")
manager.queue.Add(key)
manager.processNextWorkItem(context.TODO())
manager.processNextWorkItem(ctx)
retries := manager.queue.NumRequeues(key)
if retries != 1 {
t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
@ -6307,8 +6307,8 @@ func TestJobApiBackoffReset(t *testing.T) {
// the queue is emptied on success
fakePodControl.Err = nil
manager.processNextWorkItem(context.TODO())
verifyEmptyQueue(ctx, t, manager)
manager.processNextWorkItem(ctx)
verifyEmptyQueue(t, manager)
}
var _ workqueue.TypedRateLimitingInterface[string] = &fakeRateLimitingQueue{}
@ -7091,13 +7091,13 @@ func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPol
func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
t.Helper()
verifyEmptyQueue(ctx, t, jm)
verifyEmptyQueue(t, jm)
awaitForQueueLen(ctx, t, jm, wantQueueLen)
}
func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
t.Helper()
verifyEmptyQueue(ctx, t, jm)
verifyEmptyQueue(t, jm)
if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) {
if requeued := jm.queue.Len() == wantQueueLen; requeued {
return true, nil
@ -7109,7 +7109,7 @@ func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQue
}
}
func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) {
func verifyEmptyQueue(t *testing.T, jm *Controller) {
t.Helper()
if jm.queue.Len() > 0 {
t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len())

View File

@ -83,7 +83,7 @@ func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *baseme
cmpErr = nil
value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...))
if err != nil {
return true, fmt.Errorf("collecting the %q metric: %q", counterVec.Name, err)
return true, fmt.Errorf("collecting the %q metric: %w", counterVec.Name, err)
}
if wantMetric.Value != int(value) {
cmpErr = fmt.Errorf("Unexpected metric delta for %q metric with labels %q. want: %v, got: %v", counterVec.Name, wantMetric.Labels, wantMetric.Value, int(value))
@ -92,7 +92,7 @@ func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *baseme
return true, nil
})
if err != nil {
t.Errorf("Failed waiting for expected metric: %q", err)
t.Errorf("Failed waiting for expected metric: %v", err)
}
if cmpErr != nil {
t.Error(cmpErr)
@ -471,8 +471,8 @@ func TestJobPodFailurePolicy(t *testing.T) {
return true
}
if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
if _, err := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
t.Fatalf("Error %q while updating pod status for Job: %v", err, jobObj.Name)
}
if test.restartController {
@ -488,8 +488,8 @@ func TestJobPodFailurePolicy(t *testing.T) {
})
if test.wantJobConditionType == batchv1.JobComplete {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
}
}
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
@ -1034,8 +1034,8 @@ func TestBackoffLimitPerIndex_Reenabling(t *testing.T) {
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To("2"))
// mark remaining pods are Succeeded and verify Job status
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobFailed(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
@ -1529,11 +1529,11 @@ func TestBackoffLimitPerIndex(t *testing.T) {
for _, podTermination := range test.podTerminations {
pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
if err != nil {
t.Fatalf("listing Job Pods: %q", err)
t.Fatalf("listing Job Pods: %v", err)
}
pod.Status = podTermination.status
if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Error updating the pod %q: %q", klog.KObj(pod), err)
t.Fatalf("Error updating the pod %q: %v", klog.KObj(pod), err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: podTermination.wantActive,
@ -1560,8 +1560,8 @@ func TestBackoffLimitPerIndex(t *testing.T) {
remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive
if remainingActive > 0 {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
}
}
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
@ -1789,7 +1789,7 @@ func TestManagedBy_Reenabling(t *testing.T) {
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
// Marking the pod as finished, but it does not result in updating of the Job status.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
}
@ -1855,7 +1855,7 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
})
// Marking the pod as complete queues the job reconciliation
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
}
@ -2053,7 +2053,7 @@ func TestNonParallelJob(t *testing.T) {
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
// Failed Pod is replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -2072,7 +2072,7 @@ func TestNonParallelJob(t *testing.T) {
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
// No more Pods are created after the Pod succeeds.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobComplete(ctx, t, clientSet, jobObj)
@ -2117,14 +2117,14 @@ func TestParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
if _, err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
want.Ready = ptr.To[int32](2)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
@ -2135,7 +2135,7 @@ func TestParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Once one Pod succeeds, no more Pods are created, even if some fail.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
@ -2146,7 +2146,7 @@ func TestParallelJob(t *testing.T) {
Terminating: ptr.To[int32](0),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
@ -2158,7 +2158,7 @@ func TestParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobComplete(ctx, t, clientSet, jobObj)
@ -2231,8 +2231,8 @@ func TestParallelJobChangingParallelism(t *testing.T) {
})
// Succeed Job
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobComplete(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -2271,14 +2271,14 @@ func TestParallelJobWithCompletions(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
if _, err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
want.Ready = ptr.To[int32](52)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
@ -2289,7 +2289,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Pods are created until the number of succeeded Pods equals completions.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
@ -2301,7 +2301,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after the Job completes.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobComplete(ctx, t, clientSet, jobObj)
@ -2391,7 +2391,7 @@ func TestIndexedJob(t *testing.T) {
})
// Remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatal("Failed trying to succeed remaining pods")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -2993,7 +2993,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
})
remaining := int(tc.nPods)
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
if succ, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
remaining -= succ
b.Logf("Transient failure succeeding pods: %v", err)
return false, nil
@ -3086,7 +3086,7 @@ func BenchmarkLargeFailureHandling(b *testing.B) {
b.StartTimer()
remaining := int(tc.nPods)
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
if fail, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
remaining -= fail
b.Logf("Transient failure failing pods: %v", err)
return false, nil
@ -3205,7 +3205,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
// Fail a pod ASAP.
err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
return false, nil
}
return true, nil
@ -3241,7 +3241,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
})
// Fail the first pod
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -3252,7 +3252,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
})
// Fail the second pod
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -3341,12 +3341,12 @@ func TestJobFailedWithInterrupts(t *testing.T) {
Terminating: ptr.To[int32](0),
})
t.Log("Finishing pods")
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Could not fail a pod: %v", err)
}
remaining := 9
if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
if succ, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
remaining -= succ
t.Logf("Transient failure succeeding pods: %v", err)
return false, nil
@ -3776,7 +3776,7 @@ func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset
}
}
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) {
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (int, error) {
op := func(p *v1.Pod) bool {
p.Status.Phase = phase
if phase == v1.PodFailed || phase == v1.PodSucceeded {
@ -3795,7 +3795,7 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
}
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) {
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (int, error) {
op := func(p *v1.Pod) bool {
if podutil.IsPodReady(p) {
return false
@ -3809,10 +3809,10 @@ func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
}
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) {
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (int, error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("listing Job Pods: %w", err), 0
return 0, fmt.Errorf("listing Job Pods: %w", err)
}
updates := make([]v1.Pod, 0, cnt)
for _, pod := range pods.Items {
@ -3828,9 +3828,9 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job
}
successful, err := updatePodStatuses(ctx, clientSet, updates)
if successful != cnt {
return fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful), successful
return successful, fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful)
}
return err, successful
return successful, err
}
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) {