Job: Replace deprecated wait functions with supported one

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
This commit is contained in:
Yuki Iwai 2023-10-18 22:03:13 +09:00
parent dc8b57d8a7
commit d7556769e7
2 changed files with 32 additions and 32 deletions

View File

@ -2115,7 +2115,7 @@ func TestPastDeadlineJobFinished(t *testing.T) {
}
var j *batch.Job
err = wait.PollImmediate(200*time.Microsecond, 3*time.Second, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, 200*time.Microsecond, 3*time.Second, true, func(ctx context.Context) (done bool, err error) {
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
if err != nil {
return false, err
@ -2125,7 +2125,7 @@ func TestPastDeadlineJobFinished(t *testing.T) {
if err != nil {
t.Errorf("Job failed to ensure that start time was set: %v", err)
}
err = wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (done bool, err error) {
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
if err != nil {
return false, nil
@ -4666,7 +4666,7 @@ func TestWatchOrphanPods(t *testing.T) {
t.Fatalf("Creating orphan pod: %v", err)
}
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{})
if err != nil {
return false, err
@ -5201,7 +5201,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
uids = sets.New(string(pods[2].UID))
var diff string
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
diff = cmp.Diff(uids, gotExpectedUIDs)
return diff == "", nil

View File

@ -141,9 +141,9 @@ func TestMetricsOnSuccesses(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj)
// verify metric values after the job is finished
validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
validateCounterMetric(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric)
validateTerminatedPodsTrackingFinalizerMetric(t, int(*jobObj.Spec.Parallelism))
validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
validateCounterMetric(ctx, t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, int(*jobObj.Spec.Parallelism))
})
}
}
@ -281,15 +281,15 @@ func TestJobFinishedNumReasonMetric(t *testing.T) {
validateJobFailed(ctx, t, clientSet, jobObj)
// verify metric values after the job is finished
validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
})
}
}
func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
t.Helper()
var cmpErr error
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
cmpErr = nil
value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...))
if err != nil {
@ -309,12 +309,12 @@ func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wan
}
}
func validateTerminatedPodsTrackingFinalizerMetric(t *testing.T, want int) {
validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
func validateTerminatedPodsTrackingFinalizerMetric(ctx context.Context, t *testing.T, want int) {
validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
Value: want,
Labels: []string{metrics.Add},
})
validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
Value: want,
Labels: []string{metrics.Delete},
})
@ -412,8 +412,8 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
// removed. The finalizer will be removed by the job controller just after
// appending the FailureTarget condition to the job to mark it as targeted
// for failure.
go func() {
err := wait.PollImmediate(10*time.Millisecond, time.Minute, func() (bool, error) {
go func(ctx context.Context) {
err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Minute, true, func(ctx context.Context) (bool, error) {
failedPodUpdated, err := cs.CoreV1().Pods(jobObj.Namespace).Get(ctx, jobPods[failedIndex].Name, metav1.GetOptions{})
if err != nil {
return true, err
@ -427,7 +427,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
t.Logf("Failed awaiting for the finalizer removal for pod %v", klog.KObj(jobPods[failedIndex]))
}
wg.Done()
}()
}(ctx)
// We update one pod as failed with state matching the pod failure policy rule. This results in removal
// of the pod finalizer from the pod by the job controller.
@ -676,7 +676,7 @@ func TestJobPodFailurePolicy(t *testing.T) {
}
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
if test.wantPodFailuresHandledByPolicyRuleMetric != nil {
validateCounterMetric(t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
validateCounterMetric(ctx, t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
}
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
@ -1456,7 +1456,7 @@ func TestParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
validateTerminatedPodsTrackingFinalizerMetric(t, 7)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 7)
})
}
}
@ -1665,7 +1665,7 @@ func TestIndexedJob(t *testing.T) {
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3", nil)
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 5)
}
func TestJobPodReplacementPolicy(t *testing.T) {
@ -1806,7 +1806,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
// Wait for pods to start up.
err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
return false, err
@ -1966,7 +1966,7 @@ func TestElasticIndexedJob(t *testing.T) {
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
// Wait for pods to start up.
err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
return false, err
@ -2141,7 +2141,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
}
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
// Pods never finished, so they are not counted in the metric.
validateTerminatedPodsTrackingFinalizerMetric(t, 0)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 0)
})
}
}
@ -2172,7 +2172,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
}
// Fail a pod ASAP.
err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
return false, nil
}
@ -2305,7 +2305,7 @@ func TestJobFailedWithInterrupts(t *testing.T) {
t.Fatalf("Could not fail a pod: %v", err)
}
remaining := 9
if err := wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
remaining -= succ
t.Logf("Transient failure succeeding pods: %v", err)
@ -2324,7 +2324,7 @@ func TestJobFailedWithInterrupts(t *testing.T) {
func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
t.Helper()
orphanPods := 0
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector),
})
@ -2440,7 +2440,7 @@ func TestSuspendJob(t *testing.T) {
if got, want := getJobConditionStatus(ctx, job, batchv1.JobSuspended), status; got != want {
t.Errorf("Unexpected Job condition %q status after %s: got %q, want %q", batchv1.JobSuspended, s, got, want)
}
if err := waitForEvent(events, job.UID, reason); err != nil {
if err := waitForEvent(ctx, events, job.UID, reason); err != nil {
t.Errorf("Waiting for event with reason %q after %s: %v", reason, s, err)
}
}
@ -2507,7 +2507,7 @@ func TestNodeSelectorUpdate(t *testing.T) {
// (2) Check that the pod was created using the expected node selector.
var pod *v1.Pod
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
pods, err := clientSet.CoreV1().Pods(jobNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
@ -2549,7 +2549,7 @@ type podsByStatus struct {
func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
t.Helper()
var actualCounts podsByStatus
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get updated Job: %v", err)
@ -2571,7 +2571,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
t.Helper()
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)
var active []*v1.Pod
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
@ -2671,11 +2671,11 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients
}
}
func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
func waitForEvent(ctx context.Context, events watch.Interface, uid types.UID, reason string) error {
if reason == "" {
return nil
}
return wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
return wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
for {
var ev watch.Event
select {
@ -2716,7 +2716,7 @@ func validateJobSucceeded(ctx context.Context, t testing.TB, clientSet clientset
func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) {
t.Helper()
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to obtain updated Job: %v", err)