mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Merge pull request #125914 from mimowo/cleanup-job-tests
Cleanup Job tests: align error messages, no TODO ctx, no unused params
This commit is contained in:
commit
bd1f86a154
@ -5287,7 +5287,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
|||||||
job := newJob(2, 2, 6, batch.NonIndexedCompletion)
|
job := newJob(2, 2, 6, batch.NonIndexedCompletion)
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
manager.queue.Add(testutil.GetKey(job, t))
|
manager.queue.Add(testutil.GetKey(job, t))
|
||||||
manager.processNextWorkItem(context.TODO())
|
manager.processNextWorkItem(ctx)
|
||||||
if tc.wantRequeued {
|
if tc.wantRequeued {
|
||||||
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1)
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1)
|
||||||
} else {
|
} else {
|
||||||
@ -5297,7 +5297,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
|||||||
// into the queue asynchronously.
|
// into the queue asynchronously.
|
||||||
manager.clock.Sleep(fastJobApiBackoff)
|
manager.clock.Sleep(fastJobApiBackoff)
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
verifyEmptyQueue(ctx, t, manager)
|
verifyEmptyQueue(t, manager)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -5570,7 +5570,7 @@ func TestGetPodsForJob(t *testing.T) {
|
|||||||
informer.Core().V1().Pods().Informer().GetIndexer().Add(p)
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
pods, err := jm.getPodsForJob(context.TODO(), job)
|
pods, err := jm.getPodsForJob(ctx, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("getPodsForJob() error: %v", err)
|
t.Fatalf("getPodsForJob() error: %v", err)
|
||||||
}
|
}
|
||||||
@ -5961,7 +5961,7 @@ func TestWatchJobs(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
sharedInformerFactory.Start(stopCh)
|
sharedInformerFactory.Start(stopCh)
|
||||||
go manager.Run(context.TODO(), 1)
|
go manager.Run(ctx, 1)
|
||||||
|
|
||||||
// We're sending new job to see if it reaches syncHandler.
|
// We're sending new job to see if it reaches syncHandler.
|
||||||
testJob.Namespace = "bar"
|
testJob.Namespace = "bar"
|
||||||
@ -6008,7 +6008,7 @@ func TestWatchPods(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh)
|
go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh)
|
||||||
go manager.Run(context.TODO(), 1)
|
go manager.Run(ctx, 1)
|
||||||
|
|
||||||
pods := newPodList(1, v1.PodRunning, testJob)
|
pods := newPodList(1, v1.PodRunning, testJob)
|
||||||
testPod := pods[0]
|
testPod := pods[0]
|
||||||
@ -6035,7 +6035,7 @@ func TestWatchOrphanPods(t *testing.T) {
|
|||||||
podInformer := sharedInformers.Core().V1().Pods().Informer()
|
podInformer := sharedInformers.Core().V1().Pods().Informer()
|
||||||
go podInformer.Run(stopCh)
|
go podInformer.Run(stopCh)
|
||||||
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
|
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
|
||||||
go manager.Run(context.TODO(), 1)
|
go manager.Run(ctx, 1)
|
||||||
|
|
||||||
// Create job but don't add it to the store.
|
// Create job but don't add it to the store.
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
@ -6297,7 +6297,7 @@ func TestJobApiBackoffReset(t *testing.T) {
|
|||||||
// error returned make the key requeued
|
// error returned make the key requeued
|
||||||
fakePodControl.Err = errors.New("Controller error")
|
fakePodControl.Err = errors.New("Controller error")
|
||||||
manager.queue.Add(key)
|
manager.queue.Add(key)
|
||||||
manager.processNextWorkItem(context.TODO())
|
manager.processNextWorkItem(ctx)
|
||||||
retries := manager.queue.NumRequeues(key)
|
retries := manager.queue.NumRequeues(key)
|
||||||
if retries != 1 {
|
if retries != 1 {
|
||||||
t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
|
t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
|
||||||
@ -6307,8 +6307,8 @@ func TestJobApiBackoffReset(t *testing.T) {
|
|||||||
|
|
||||||
// the queue is emptied on success
|
// the queue is emptied on success
|
||||||
fakePodControl.Err = nil
|
fakePodControl.Err = nil
|
||||||
manager.processNextWorkItem(context.TODO())
|
manager.processNextWorkItem(ctx)
|
||||||
verifyEmptyQueue(ctx, t, manager)
|
verifyEmptyQueue(t, manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ workqueue.TypedRateLimitingInterface[string] = &fakeRateLimitingQueue{}
|
var _ workqueue.TypedRateLimitingInterface[string] = &fakeRateLimitingQueue{}
|
||||||
@ -7091,13 +7091,13 @@ func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPol
|
|||||||
|
|
||||||
func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
|
func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
verifyEmptyQueue(ctx, t, jm)
|
verifyEmptyQueue(t, jm)
|
||||||
awaitForQueueLen(ctx, t, jm, wantQueueLen)
|
awaitForQueueLen(ctx, t, jm, wantQueueLen)
|
||||||
}
|
}
|
||||||
|
|
||||||
func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
|
func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
verifyEmptyQueue(ctx, t, jm)
|
verifyEmptyQueue(t, jm)
|
||||||
if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) {
|
if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) {
|
||||||
if requeued := jm.queue.Len() == wantQueueLen; requeued {
|
if requeued := jm.queue.Len() == wantQueueLen; requeued {
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -7109,7 +7109,7 @@ func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQue
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) {
|
func verifyEmptyQueue(t *testing.T, jm *Controller) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if jm.queue.Len() > 0 {
|
if jm.queue.Len() > 0 {
|
||||||
t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len())
|
t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len())
|
||||||
|
@ -83,7 +83,7 @@ func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *baseme
|
|||||||
cmpErr = nil
|
cmpErr = nil
|
||||||
value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...))
|
value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("collecting the %q metric: %q", counterVec.Name, err)
|
return true, fmt.Errorf("collecting the %q metric: %w", counterVec.Name, err)
|
||||||
}
|
}
|
||||||
if wantMetric.Value != int(value) {
|
if wantMetric.Value != int(value) {
|
||||||
cmpErr = fmt.Errorf("Unexpected metric delta for %q metric with labels %q. want: %v, got: %v", counterVec.Name, wantMetric.Labels, wantMetric.Value, int(value))
|
cmpErr = fmt.Errorf("Unexpected metric delta for %q metric with labels %q. want: %v, got: %v", counterVec.Name, wantMetric.Labels, wantMetric.Value, int(value))
|
||||||
@ -92,7 +92,7 @@ func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *baseme
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Failed waiting for expected metric: %q", err)
|
t.Errorf("Failed waiting for expected metric: %v", err)
|
||||||
}
|
}
|
||||||
if cmpErr != nil {
|
if cmpErr != nil {
|
||||||
t.Error(cmpErr)
|
t.Error(cmpErr)
|
||||||
@ -471,8 +471,8 @@ func TestJobPodFailurePolicy(t *testing.T) {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
|
if _, err := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
|
||||||
t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
|
t.Fatalf("Error %q while updating pod status for Job: %v", err, jobObj.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if test.restartController {
|
if test.restartController {
|
||||||
@ -488,8 +488,8 @@ func TestJobPodFailurePolicy(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if test.wantJobConditionType == batchv1.JobComplete {
|
if test.wantJobConditionType == batchv1.JobComplete {
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||||
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
|
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
|
||||||
@ -1034,8 +1034,8 @@ func TestBackoffLimitPerIndex_Reenabling(t *testing.T) {
|
|||||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To("2"))
|
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To("2"))
|
||||||
|
|
||||||
// mark remaining pods are Succeeded and verify Job status
|
// mark remaining pods are Succeeded and verify Job status
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
||||||
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
validateJobFailed(ctx, t, clientSet, jobObj)
|
validateJobFailed(ctx, t, clientSet, jobObj)
|
||||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||||
@ -1529,11 +1529,11 @@ func TestBackoffLimitPerIndex(t *testing.T) {
|
|||||||
for _, podTermination := range test.podTerminations {
|
for _, podTermination := range test.podTerminations {
|
||||||
pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
|
pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("listing Job Pods: %q", err)
|
t.Fatalf("listing Job Pods: %v", err)
|
||||||
}
|
}
|
||||||
pod.Status = podTermination.status
|
pod.Status = podTermination.status
|
||||||
if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
|
if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
|
||||||
t.Fatalf("Error updating the pod %q: %q", klog.KObj(pod), err)
|
t.Fatalf("Error updating the pod %q: %v", klog.KObj(pod), err)
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
Active: podTermination.wantActive,
|
Active: podTermination.wantActive,
|
||||||
@ -1560,8 +1560,8 @@ func TestBackoffLimitPerIndex(t *testing.T) {
|
|||||||
|
|
||||||
remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive
|
remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive
|
||||||
if remainingActive > 0 {
|
if remainingActive > 0 {
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil {
|
||||||
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
|
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
|
||||||
@ -1789,7 +1789,7 @@ func TestManagedBy_Reenabling(t *testing.T) {
|
|||||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||||
|
|
||||||
// Marking the pod as finished, but it does not result in updating of the Job status.
|
// Marking the pod as finished, but it does not result in updating of the Job status.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||||
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
|
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1855,7 +1855,7 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Marking the pod as complete queues the job reconciliation
|
// Marking the pod as complete queues the job reconciliation
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||||
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
|
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2053,7 +2053,7 @@ func TestNonParallelJob(t *testing.T) {
|
|||||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||||
|
|
||||||
// Failed Pod is replaced.
|
// Failed Pod is replaced.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
@ -2072,7 +2072,7 @@ func TestNonParallelJob(t *testing.T) {
|
|||||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||||
|
|
||||||
// No more Pods are created after the Pod succeeds.
|
// No more Pods are created after the Pod succeeds.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
validateJobComplete(ctx, t, clientSet, jobObj)
|
validateJobComplete(ctx, t, clientSet, jobObj)
|
||||||
@ -2117,14 +2117,14 @@ func TestParallelJob(t *testing.T) {
|
|||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
|
|
||||||
// Tracks ready pods, if enabled.
|
// Tracks ready pods, if enabled.
|
||||||
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
|
if _, err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
|
||||||
t.Fatalf("Failed Marking Pods as ready: %v", err)
|
t.Fatalf("Failed Marking Pods as ready: %v", err)
|
||||||
}
|
}
|
||||||
want.Ready = ptr.To[int32](2)
|
want.Ready = ptr.To[int32](2)
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
|
|
||||||
// Failed Pods are replaced.
|
// Failed Pods are replaced.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||||
}
|
}
|
||||||
want = podsByStatus{
|
want = podsByStatus{
|
||||||
@ -2135,7 +2135,7 @@ func TestParallelJob(t *testing.T) {
|
|||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
// Once one Pod succeeds, no more Pods are created, even if some fail.
|
// Once one Pod succeeds, no more Pods are created, even if some fail.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
want = podsByStatus{
|
want = podsByStatus{
|
||||||
@ -2146,7 +2146,7 @@ func TestParallelJob(t *testing.T) {
|
|||||||
Terminating: ptr.To[int32](0),
|
Terminating: ptr.To[int32](0),
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||||
}
|
}
|
||||||
want = podsByStatus{
|
want = podsByStatus{
|
||||||
@ -2158,7 +2158,7 @@ func TestParallelJob(t *testing.T) {
|
|||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
// No more Pods are created after remaining Pods succeed.
|
// No more Pods are created after remaining Pods succeed.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
validateJobComplete(ctx, t, clientSet, jobObj)
|
validateJobComplete(ctx, t, clientSet, jobObj)
|
||||||
@ -2231,8 +2231,8 @@ func TestParallelJobChangingParallelism(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Succeed Job
|
// Succeed Job
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
validateJobComplete(ctx, t, clientSet, jobObj)
|
validateJobComplete(ctx, t, clientSet, jobObj)
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
@ -2271,14 +2271,14 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
// Tracks ready pods, if enabled.
|
// Tracks ready pods, if enabled.
|
||||||
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
|
if _, err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
|
||||||
t.Fatalf("Failed Marking Pods as ready: %v", err)
|
t.Fatalf("Failed Marking Pods as ready: %v", err)
|
||||||
}
|
}
|
||||||
want.Ready = ptr.To[int32](52)
|
want.Ready = ptr.To[int32](52)
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
|
|
||||||
// Failed Pods are replaced.
|
// Failed Pods are replaced.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||||
}
|
}
|
||||||
want = podsByStatus{
|
want = podsByStatus{
|
||||||
@ -2289,7 +2289,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
// Pods are created until the number of succeeded Pods equals completions.
|
// Pods are created until the number of succeeded Pods equals completions.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
want = podsByStatus{
|
want = podsByStatus{
|
||||||
@ -2301,7 +2301,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||||
// No more Pods are created after the Job completes.
|
// No more Pods are created after the Job completes.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
validateJobComplete(ctx, t, clientSet, jobObj)
|
validateJobComplete(ctx, t, clientSet, jobObj)
|
||||||
@ -2391,7 +2391,7 @@ func TestIndexedJob(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Remaining Pods succeed.
|
// Remaining Pods succeed.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||||
t.Fatal("Failed trying to succeed remaining pods")
|
t.Fatal("Failed trying to succeed remaining pods")
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
@ -2993,7 +2993,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
|
|||||||
})
|
})
|
||||||
remaining := int(tc.nPods)
|
remaining := int(tc.nPods)
|
||||||
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
||||||
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
|
if succ, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
|
||||||
remaining -= succ
|
remaining -= succ
|
||||||
b.Logf("Transient failure succeeding pods: %v", err)
|
b.Logf("Transient failure succeeding pods: %v", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
@ -3086,7 +3086,7 @@ func BenchmarkLargeFailureHandling(b *testing.B) {
|
|||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
remaining := int(tc.nPods)
|
remaining := int(tc.nPods)
|
||||||
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
||||||
if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
|
if fail, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
|
||||||
remaining -= fail
|
remaining -= fail
|
||||||
b.Logf("Transient failure failing pods: %v", err)
|
b.Logf("Transient failure failing pods: %v", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
@ -3205,7 +3205,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
|
|||||||
|
|
||||||
// Fail a pod ASAP.
|
// Fail a pod ASAP.
|
||||||
err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -3241,7 +3241,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Fail the first pod
|
// Fail the first pod
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
@ -3252,7 +3252,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Fail the second pod
|
// Fail the second pod
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
@ -3341,12 +3341,12 @@ func TestJobFailedWithInterrupts(t *testing.T) {
|
|||||||
Terminating: ptr.To[int32](0),
|
Terminating: ptr.To[int32](0),
|
||||||
})
|
})
|
||||||
t.Log("Finishing pods")
|
t.Log("Finishing pods")
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||||
t.Fatalf("Could not fail a pod: %v", err)
|
t.Fatalf("Could not fail a pod: %v", err)
|
||||||
}
|
}
|
||||||
remaining := 9
|
remaining := 9
|
||||||
if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||||
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
|
if succ, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
|
||||||
remaining -= succ
|
remaining -= succ
|
||||||
t.Logf("Transient failure succeeding pods: %v", err)
|
t.Logf("Transient failure succeeding pods: %v", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
@ -3776,7 +3776,7 @@ func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) {
|
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (int, error) {
|
||||||
op := func(p *v1.Pod) bool {
|
op := func(p *v1.Pod) bool {
|
||||||
p.Status.Phase = phase
|
p.Status.Phase = phase
|
||||||
if phase == v1.PodFailed || phase == v1.PodSucceeded {
|
if phase == v1.PodFailed || phase == v1.PodSucceeded {
|
||||||
@ -3795,7 +3795,7 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
|
|||||||
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
|
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) {
|
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (int, error) {
|
||||||
op := func(p *v1.Pod) bool {
|
op := func(p *v1.Pod) bool {
|
||||||
if podutil.IsPodReady(p) {
|
if podutil.IsPodReady(p) {
|
||||||
return false
|
return false
|
||||||
@ -3809,10 +3809,10 @@ func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj
|
|||||||
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
|
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) {
|
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (int, error) {
|
||||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("listing Job Pods: %w", err), 0
|
return 0, fmt.Errorf("listing Job Pods: %w", err)
|
||||||
}
|
}
|
||||||
updates := make([]v1.Pod, 0, cnt)
|
updates := make([]v1.Pod, 0, cnt)
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
@ -3828,9 +3828,9 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job
|
|||||||
}
|
}
|
||||||
successful, err := updatePodStatuses(ctx, clientSet, updates)
|
successful, err := updatePodStatuses(ctx, clientSet, updates)
|
||||||
if successful != cnt {
|
if successful != cnt {
|
||||||
return fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful), successful
|
return successful, fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful)
|
||||||
}
|
}
|
||||||
return err, successful
|
return successful, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) {
|
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user