diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 0df6a4ef6ba..9337dc222d6 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -151,9 +151,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor jm.enqueueController(obj, true) }, UpdateFunc: jm.updateJob, - DeleteFunc: func(obj interface{}) { - jm.enqueueController(obj, true) - }, + DeleteFunc: jm.deleteJob, }) jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced @@ -238,7 +236,7 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta return job } -// When a pod is created, enqueue the controller that manages it and update it's expectations. +// When a pod is created, enqueue the controller that manages it and update its expectations. func (jm *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) if pod.DeletionTimestamp != nil { @@ -263,7 +261,12 @@ func (jm *Controller) addPod(obj interface{}) { return } - // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // Otherwise, it's an orphan. + // Clean the finalizer. + if hasJobTrackingFinalizer(pod) { + jm.enqueueOrphanPod(pod) + } + // Get a list of all matching controllers and sync // them to see if anyone wants to adopt it. // DO NOT observe creation because no controller should be waiting for an // orphan. @@ -333,7 +336,12 @@ func (jm *Controller) updatePod(old, cur interface{}) { return } - // Otherwise, it's an orphan. If anything changed, sync matching controllers + // Otherwise, it's an orphan. + // Clean the finalizer. + if hasJobTrackingFinalizer(curPod) { + jm.enqueueOrphanPod(curPod) + } + // If anything changed, sync matching controllers // to see if anyone wants to adopt it now. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if labelChanged || controllerRefChanged { @@ -366,13 +374,18 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { } controllerRef := metav1.GetControllerOf(pod) + hasFinalizer := hasJobTrackingFinalizer(pod) if controllerRef == nil { // No controller should care about orphans being deleted. + // But this pod might have belonged to a Job and the GC removed the reference. + if hasFinalizer { + jm.enqueueOrphanPod(pod) + } return } job := jm.resolveControllerRef(pod.Namespace, controllerRef) if job == nil { - if hasJobTrackingFinalizer(pod) { + if hasFinalizer { jm.enqueueOrphanPod(pod) } return @@ -385,7 +398,7 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { // Consider the finalizer removed if this is the final delete. Otherwise, // it's an update for the deletion timestamp, then check finalizer. - if final || !hasJobTrackingFinalizer(pod) { + if final || !hasFinalizer { jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) } @@ -421,6 +434,37 @@ func (jm *Controller) updateJob(old, cur interface{}) { } } +// deleteJob enqueues the job and all the pods associated with it that still +// have a finalizer. +func (jm *Controller) deleteJob(obj interface{}) { + jm.enqueueController(obj, true) + jobObj, ok := obj.(*batch.Job) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) + return + } + jobObj, ok = tombstone.Obj.(*batch.Job) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a job %+v", obj)) + return + } + } + // Listing pods shouldn't really fail, as we are just querying the informer cache. + selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector) + if err != nil { + utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err)) + return + } + pods, _ := jm.podStore.Pods(jobObj.Namespace).List(selector) + for _, pod := range pods { + if metav1.IsControlledBy(pod, jobObj) && hasJobTrackingFinalizer(pod) { + jm.enqueueOrphanPod(pod) + } + } +} + // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item, // immediate tells the controller to update the status right away, and should // happen ONLY when there was a successful pod run. @@ -538,6 +582,14 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { } return err } + // Make sure the pod is still orphaned. + if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil { + job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef) + if job != nil { + // The pod was adopted. Do not remove finalizer. + return nil + } + } if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil { if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) { return err diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index bda45b40615..e46e77e7b96 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -36,20 +36,26 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/controller-manager/pkg/informerfactory" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller/garbagecollector" jobcontroller "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" ) -const waitInterval = 500 * time.Millisecond +const waitInterval = time.Second // TestNonParallelJob tests that a Job that only executes one Pod. The test // recreates the Job controller at some points to make sure a new controller @@ -61,7 +67,7 @@ func TestNonParallelJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer func() { cancel() }() @@ -79,7 +85,7 @@ func TestNonParallelJob(t *testing.T) { // Restarting controller. cancel() - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) // Failed Pod is replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { @@ -92,7 +98,7 @@ func TestNonParallelJob(t *testing.T) { // Restarting controller. cancel() - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) // No more Pods are created after the Pod succeeds. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { @@ -132,7 +138,7 @@ func TestParallelJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "parallel") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -220,7 +226,7 @@ func TestParallelJobParallelism(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "parallel") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -296,7 +302,7 @@ func TestParallelJobWithCompletions(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() closeFn, restConfig, clientSet, ns := setup(t, "completions") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -376,7 +382,7 @@ func TestIndexedJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "indexed") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer func() { cancel() }() @@ -447,7 +453,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer func() { cancel() }() @@ -477,7 +483,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { } // Restart controller. - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) // Ensure Job continues to be tracked and finalizers are removed. validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -503,7 +509,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { } // Restart controller. - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) // Ensure Job continues to be tracked and finalizers are removed. validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -513,13 +519,77 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { }, false) } -func TestOrphanPodsFinalizersCleared(t *testing.T) { +func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() + for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground} { + t.Run(string(policy), func(t *testing.T) { + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "controller-informers")), 0) + // Make the job controller significantly slower to trigger race condition. + restConfig.QPS = 1 + restConfig.Burst = 1 + jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet) + defer cancel() + restConfig.QPS = 200 + restConfig.Burst = 200 + runGC := createGC(ctx, t, restConfig, informerSet) + informerSet.Start(ctx.Done()) + go jc.Run(ctx, 1) + runGC() + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(5), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + if !hasJobTrackingAnnotation(jobObj) { + t.Error("apiserver didn't add the tracking annotation") + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 5, + }, true) + + // Delete Job. The GC should delete the pods in cascade. + err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{ + PropagationPolicy: &policy, + }) + if err != nil { + t.Fatalf("Failed to delete job: %v", err) + } + orphanPods := 0 + if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { + pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector), + }) + if err != nil { + return false, err + } + orphanPods = 0 + for _, pod := range pods.Items { + if hasJobTrackingFinalizer(&pod) { + orphanPods++ + } + } + return orphanPods == 0, nil + }); err != nil { + t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err) + t.Logf("Last saw %d orphan pods", orphanPods) + } + }) + } +} + +func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) { // Step 0: job created while feature is enabled. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer func() { cancel() }() @@ -550,15 +620,15 @@ func TestOrphanPodsFinalizersCleared(t *testing.T) { } // Restart controller. - ctx, cancel = startJobController(restConfig, clientSet) + ctx, cancel = startJobController(restConfig) if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { - t.Fatalf("Falied to list Job Pods: %v", err) + t.Fatalf("Failed to list Job Pods: %v", err) } sawPods := false for _, pod := range pods.Items { - if isPodOwnedByJob(&pod, jobObj) { + if metav1.IsControlledBy(&pod, jobObj) { if hasJobTrackingFinalizer(&pod) { return false, nil } @@ -600,7 +670,7 @@ func TestSuspendJob(t *testing.T) { t.Run(name, func(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer cancel() events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) if err != nil { @@ -650,7 +720,7 @@ func TestSuspendJob(t *testing.T) { func TestSuspendJobControllerRestart(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer func() { cancel() }() @@ -680,7 +750,7 @@ func TestNodeSelectorUpdate(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobController(restConfig, clientSet) + ctx, cancel := startJobController(restConfig) defer cancel() job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{ @@ -780,7 +850,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse active = nil for _, pod := range pods.Items { phase := pod.Status.Phase - if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) { + if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) { p := pod active = append(active, &p) } @@ -806,7 +876,7 @@ func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSe } for _, pod := range pods.Items { phase := pod.Status.Phase - if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) { + if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) { t.Errorf("Finished pod %s still has a tracking finalizer", pod.Name) } } @@ -830,7 +900,7 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients } gotActive := sets.NewInt() for _, pod := range pods.Items { - if isPodOwnedByJob(&pod, jobObj) { + if metav1.IsControlledBy(&pod, jobObj) { if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning { ix, err := getCompletionIndex(&pod) if err != nil { @@ -931,7 +1001,7 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job if len(updates) == cnt { break } - if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { + if p := pod.Status.Phase; metav1.IsControlledBy(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { if !op(&pod) { continue } @@ -975,7 +1045,7 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job return fmt.Errorf("listing Job Pods: %w", err) } for _, pod := range pods.Items { - if p := pod.Status.Phase; !isPodOwnedByJob(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded { + if p := pod.Status.Phase; !metav1.IsControlledBy(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded { continue } if pix, err := getCompletionIndex(&pod); err == nil && pix == ix { @@ -1001,15 +1071,6 @@ func getCompletionIndex(p *v1.Pod) (int, error) { return strconv.Atoi(v) } -func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool { - for _, owner := range p.ObjectMeta.OwnerReferences { - if owner.Kind == "Job" && owner.UID == j.UID { - return true - } - } - return false -} - func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, ns string, jobObj *batchv1.Job) (*batchv1.Job, error) { if jobObj.Name == "" { jobObj.Name = "test-job" @@ -1046,16 +1107,55 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co return closeFn, &config, clientSet, ns } -func startJobController(restConfig *restclient.Config, clientSet clientset.Interface) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(context.Background()) - resyncPeriod := 12 * time.Hour - informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod) - jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) +func startJobController(restConfig *restclient.Config) (context.Context, context.CancelFunc) { + informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0) + jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet) informerSet.Start(ctx.Done()) go jc.Run(ctx, 1) return ctx, cancel } +func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { + clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller")) + ctx, cancel := context.WithCancel(context.Background()) + jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + return jc, ctx, cancel +} + +func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() { + restConfig = restclient.AddUserAgent(restConfig, "gc-controller") + clientSet := clientset.NewForConfigOrDie(restConfig) + metadataClient, err := metadata.NewForConfig(restConfig) + if err != nil { + t.Fatalf("Failed to create metadataClient: %v", err) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) + restMapper.Reset() + metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + gc, err := garbagecollector.NewGarbageCollector( + clientSet, + metadataClient, + restMapper, + garbagecollector.DefaultIgnoredResources(), + informerfactory.NewInformerFactory(informerSet, metadataInformers), + alwaysStarted, + ) + if err != nil { + t.Fatalf("Failed creating garbage collector") + } + startGC := func() { + syncPeriod := 5 * time.Second + go wait.Until(func() { + restMapper.Reset() + }, syncPeriod, ctx.Done()) + go gc.Run(ctx, 1) + go gc.Sync(clientSet.Discovery(), syncPeriod, ctx.Done()) + } + return startGC +} + func hasJobTrackingFinalizer(obj metav1.Object) bool { for _, fin := range obj.GetFinalizers() { if fin == batchv1.JobTrackingFinalizer {