diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index d385a4de3d7..6135d4884a1 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -470,7 +470,7 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { } // if curJob is finished, remove the finalizer as a backup check. - if curJob.Status.CompletionTime != nil { + if IsJobFinished(curJob) { jm.backupRemovePodFinalizers(curJob) } @@ -1873,20 +1873,16 @@ func onlyReplaceFailedPods(job *batch.Job) bool { return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil } -func (jm *Controller) backupRemovePodFinalizers(obj interface{}) { - jobObj, ok := obj.(*batch.Job) - - if !ok { - return - } - selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector) +func (jm *Controller) backupRemovePodFinalizers(job *batch.Job) { + // Listing pods shouldn't really fail, as we are just querying the informer cache. + selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err)) return } - pods, _ := jm.podStore.Pods(jobObj.Namespace).List(selector) + pods, _ := jm.podStore.Pods(job.Namespace).List(selector) for _, pod := range pods { - if metav1.IsControlledBy(pod, jobObj) && hasJobTrackingFinalizer(pod) { + if metav1.IsControlledBy(pod, job) && hasJobTrackingFinalizer(pod) { jm.enqueueOrphanPod(pod) } } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index a84ffc7c101..b3a442fe3f7 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -62,12 +62,16 @@ import ( "k8s.io/utils/pointer" ) -var realClock = &clock.RealClock{} -var alwaysReady = func() bool { return true } +var ( + realClock = &clock.RealClock{} + alwaysReady = func() bool { return true } +) -const fastSyncJobBatchPeriod = 10 * time.Millisecond -const fastJobApiBackoff = 10 * time.Millisecond -const fastRequeue = 10 * time.Millisecond +const ( + fastSyncJobBatchPeriod = 10 * time.Millisecond + fastJobApiBackoff = 10 * time.Millisecond + fastRequeue = 10 * time.Millisecond +) // testFinishedAt represents time one second later than unix epoch // this will be used in various test cases where we don't want back-off to kick in @@ -2151,7 +2155,6 @@ func TestSingleJobFailedCondition(t *testing.T) { if failedConditions[0].Status != v1.ConditionTrue { t.Errorf("Unexpected status for the failed condition. Expected: %v, saw %v\n", v1.ConditionTrue, failedConditions[0].Status) } - } func TestSyncJobComplete(t *testing.T) { @@ -4698,9 +4701,11 @@ func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {} func (f *fakeRateLimitingQueue) Forget(item interface{}) { f.requeues = 0 } + func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int { return f.requeues } + func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) { f.item = item f.duration = duration @@ -4788,57 +4793,79 @@ func TestJobBackoffForOnFailure(t *testing.T) { }{ "backoffLimit 0 should have 1 pod active": { 1, 1, 0, - false, []int32{0}, v1.PodRunning, + false, + []int32{0}, + v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 0 should have 1 pod active": { 1, 1, 1, - false, []int32{0}, v1.PodRunning, + false, + []int32{0}, + v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": { 1, 1, 1, - false, []int32{1}, v1.PodRunning, + false, + []int32{1}, + v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": { 1, 1, 1, - false, []int32{1}, v1.PodPending, + false, + []int32{1}, + v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - single pod": { 1, 5, 2, - false, []int32{2}, v1.PodRunning, + false, + []int32{2}, + v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - single pod": { 1, 5, 2, - false, []int32{2}, v1.PodPending, + false, + []int32{2}, + v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - multiple pods": { 2, 5, 2, - false, []int32{1, 1}, v1.PodRunning, + false, + []int32{1, 1}, + v1.PodRunning, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - multiple pods": { 2, 5, 2, - false, []int32{1, 1}, v1.PodPending, + false, + []int32{1, 1}, + v1.PodPending, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "not enough failures": { 2, 5, 3, - false, []int32{1, 1}, v1.PodRunning, + false, + []int32{1, 1}, + v1.PodRunning, 2, 0, 0, nil, "", }, "suspending a job": { 2, 4, 6, - true, []int32{1, 1}, v1.PodRunning, + true, + []int32{1, 1}, + v1.PodRunning, 0, 0, 0, &jobConditionSuspended, "JobSuspended", }, "finshed job": { 2, 4, 6, - true, []int32{1, 1, 2, 0}, v1.PodSucceeded, + true, + []int32{1, 1, 2, 0}, + v1.PodSucceeded, 0, 4, 0, &jobConditionComplete, "", }, } @@ -4871,7 +4898,6 @@ func TestJobBackoffForOnFailure(t *testing.T) { // run err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) - if err != nil { t.Errorf("unexpected error syncing job. Got %#v", err) } @@ -5172,6 +5198,54 @@ func TestFinalizersRemovedExpectations(t *testing.T) { } } +func TestBackupFinalizers(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + clientset := fake.NewSimpleClientset() + sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) + manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + + stopCh := make(chan struct{}) + defer close(stopCh) + podInformer := sharedInformers.Core().V1().Pods().Informer() + go podInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, podInformer.HasSynced) + + // 1. Create the controller but do not start the workers. + // This is done above by initializing the manager and not calling manager.Run() yet. + + // 2. Create a job. + job := newJob(2, 2, 6, batch.NonIndexedCompletion) + + // 3. Create the pods. + podBuilder := buildPod().name("test_pod").deletionTimestamp().trackingFinalizer().job(job) + pod, err := clientset.CoreV1().Pods("default").Create(context.Background(), podBuilder.Pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Creating pod: %v", err) + } + + // 4. Finish the job. + job.Status.Conditions = append(job.Status.Conditions, batch.JobCondition{ + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }) + + // 5. Start the workers. + go manager.Run(context.TODO(), 1) + + // Check if the finalizer has been removed from the pod. + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + p, err := clientset.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return !hasJobTrackingFinalizer(p), nil + }); err != nil { + t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) + } +} + func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) { t.Helper() labels := p.GetLabels()