From d7556769e7fdcdb0f8290198da6c49e23bb50c24 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Wed, 18 Oct 2023 22:03:13 +0900 Subject: [PATCH] Job: Replace deprecated wait functions with supported one Signed-off-by: Yuki Iwai --- pkg/controller/job/job_controller_test.go | 8 ++-- test/integration/job/job_test.go | 56 +++++++++++------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index c3b5682957b..647795273a7 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -2115,7 +2115,7 @@ func TestPastDeadlineJobFinished(t *testing.T) { } var j *batch.Job - err = wait.PollImmediate(200*time.Microsecond, 3*time.Second, func() (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, 200*time.Microsecond, 3*time.Second, true, func(ctx context.Context) (done bool, err error) { j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) if err != nil { return false, err @@ -2125,7 +2125,7 @@ func TestPastDeadlineJobFinished(t *testing.T) { if err != nil { t.Errorf("Job failed to ensure that start time was set: %v", err) } - err = wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (done bool, err error) { j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) if err != nil { return false, nil @@ -4666,7 +4666,7 @@ func TestWatchOrphanPods(t *testing.T) { t.Fatalf("Creating orphan pod: %v", err) } - if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) if err != nil { return false, err @@ -5201,7 +5201,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { uids = sets.New(string(pods[2].UID)) var diff string - if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) diff = cmp.Diff(uids, gotExpectedUIDs) return diff == "", nil diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 8e98fd6b5f8..8222f0f5d1f 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -141,9 +141,9 @@ func TestMetricsOnSuccesses(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) // verify metric values after the job is finished - validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric) - validateCounterMetric(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric) - validateTerminatedPodsTrackingFinalizerMetric(t, int(*jobObj.Spec.Parallelism)) + validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric) + validateCounterMetric(ctx, t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric) + validateTerminatedPodsTrackingFinalizerMetric(ctx, t, int(*jobObj.Spec.Parallelism)) }) } } @@ -281,15 +281,15 @@ func TestJobFinishedNumReasonMetric(t *testing.T) { validateJobFailed(ctx, t, clientSet, jobObj) // verify metric values after the job is finished - validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric) + validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric) }) } } -func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) { +func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) { t.Helper() var cmpErr error - err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) { cmpErr = nil value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...)) if err != nil { @@ -309,12 +309,12 @@ func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wan } } -func validateTerminatedPodsTrackingFinalizerMetric(t *testing.T, want int) { - validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{ +func validateTerminatedPodsTrackingFinalizerMetric(ctx context.Context, t *testing.T, want int) { + validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{ Value: want, Labels: []string{metrics.Add}, }) - validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{ + validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{ Value: want, Labels: []string{metrics.Delete}, }) @@ -412,8 +412,8 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi // removed. The finalizer will be removed by the job controller just after // appending the FailureTarget condition to the job to mark it as targeted // for failure. - go func() { - err := wait.PollImmediate(10*time.Millisecond, time.Minute, func() (bool, error) { + go func(ctx context.Context) { + err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Minute, true, func(ctx context.Context) (bool, error) { failedPodUpdated, err := cs.CoreV1().Pods(jobObj.Namespace).Get(ctx, jobPods[failedIndex].Name, metav1.GetOptions{}) if err != nil { return true, err @@ -427,7 +427,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi t.Logf("Failed awaiting for the finalizer removal for pod %v", klog.KObj(jobPods[failedIndex])) } wg.Done() - }() + }(ctx) // We update one pod as failed with state matching the pod failure policy rule. This results in removal // of the pod finalizer from the pod by the job controller. @@ -676,7 +676,7 @@ func TestJobPodFailurePolicy(t *testing.T) { } validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType) if test.wantPodFailuresHandledByPolicyRuleMetric != nil { - validateCounterMetric(t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric) + validateCounterMetric(ctx, t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric) } validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -1456,7 +1456,7 @@ func TestParallelJob(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) - validateTerminatedPodsTrackingFinalizerMetric(t, 7) + validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 7) }) } } @@ -1665,7 +1665,7 @@ func TestIndexedJob(t *testing.T) { validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3", nil) validateJobSucceeded(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) - validateTerminatedPodsTrackingFinalizerMetric(t, 5) + validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 5) } func TestJobPodReplacementPolicy(t *testing.T) { @@ -1806,7 +1806,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) // Wait for pods to start up. - err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { return false, err @@ -1966,7 +1966,7 @@ func TestElasticIndexedJob(t *testing.T) { jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) // Wait for pods to start up. - err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { return false, err @@ -2141,7 +2141,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { } validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) // Pods never finished, so they are not counted in the metric. - validateTerminatedPodsTrackingFinalizerMetric(t, 0) + validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 0) }) } } @@ -2172,7 +2172,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { } // Fail a pod ASAP. - err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { return false, nil } @@ -2305,7 +2305,7 @@ func TestJobFailedWithInterrupts(t *testing.T) { t.Fatalf("Could not fail a pod: %v", err) } remaining := 9 - if err := wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { remaining -= succ t.Logf("Transient failure succeeding pods: %v", err) @@ -2324,7 +2324,7 @@ func TestJobFailedWithInterrupts(t *testing.T) { func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { t.Helper() orphanPods := 0 - if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { + if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{ LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector), }) @@ -2440,7 +2440,7 @@ func TestSuspendJob(t *testing.T) { if got, want := getJobConditionStatus(ctx, job, batchv1.JobSuspended), status; got != want { t.Errorf("Unexpected Job condition %q status after %s: got %q, want %q", batchv1.JobSuspended, s, got, want) } - if err := waitForEvent(events, job.UID, reason); err != nil { + if err := waitForEvent(ctx, events, job.UID, reason); err != nil { t.Errorf("Waiting for event with reason %q after %s: %v", reason, s, err) } } @@ -2507,7 +2507,7 @@ func TestNodeSelectorUpdate(t *testing.T) { // (2) Check that the pod was created using the expected node selector. var pod *v1.Pod - if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { pods, err := clientSet.CoreV1().Pods(jobNamespace).List(ctx, metav1.ListOptions{}) if err != nil { t.Fatalf("Failed to list Job Pods: %v", err) @@ -2549,7 +2549,7 @@ type podsByStatus struct { func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { t.Helper() var actualCounts podsByStatus - if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to get updated Job: %v", err) @@ -2571,7 +2571,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse t.Helper() validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired) var active []*v1.Pod - if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { t.Fatalf("Failed to list Job Pods: %v", err) @@ -2671,11 +2671,11 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients } } -func waitForEvent(events watch.Interface, uid types.UID, reason string) error { +func waitForEvent(ctx context.Context, events watch.Interface, uid types.UID, reason string) error { if reason == "" { return nil } - return wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { + return wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { for { var ev watch.Event select { @@ -2716,7 +2716,7 @@ func validateJobSucceeded(ctx context.Context, t testing.TB, clientSet clientset func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) { t.Helper() - if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to obtain updated Job: %v", err)