diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index c00ffb6da3f..159b6ca3535 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -113,7 +113,7 @@ type Controller struct { queue workqueue.TypedRateLimitingInterface[string] // Orphan deleted pods that still have a Job tracking finalizer to be removed - orphanQueue workqueue.TypedRateLimitingInterface[string] + orphanQueue workqueue.TypedRateLimitingInterface[orphanPodKey] broadcaster record.EventBroadcaster recorder record.EventRecorder @@ -143,6 +143,23 @@ type syncJobCtx struct { ready int32 } +type orphanPodKeyKind int + +const ( + // "key" + OrphanPodKeyKindName orphanPodKeyKind = iota + // "selector" + OrphanPodKeyKindSelector +) + +type orphanPodKey struct { + // Either "name" or "selector" + kind orphanPodKeyKind + namespace string + // Either "pod name" or "pod selector" + value string +} + // NewController creates a new Job controller that keeps the relevant pods // in sync with their corresponding Job objects. func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) { @@ -162,7 +179,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn expectations: controller.NewControllerExpectations(), finalizerExpectations: newUIDTrackingExpectations(), queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job", Clock: clock}), - orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job_orphan_pod", Clock: clock}), + orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[orphanPodKey](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[orphanPodKey]{Name: "job_orphan_pod", Clock: clock}), broadcaster: eventBroadcaster, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), clock: clock, @@ -513,7 +530,17 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) { return } } - jm.cleanupPodFinalizers(jobObj) + selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector) + if err != nil { + utilruntime.HandleError(fmt.Errorf("job %s/%s has invalid label selector: %w", jobObj.Namespace, jobObj.Name, err)) + return + } + orphanPodKey := orphanPodKey{ + kind: OrphanPodKeyKindSelector, + namespace: jobObj.Namespace, + value: selector.String(), + } + jm.orphanQueue.Add(orphanPodKey) } // enqueueSyncJobImmediately tells the Job controller to invoke syncJob @@ -563,12 +590,12 @@ func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{} } func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) { - key, err := controller.KeyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return + orphanPodKey := orphanPodKey{ + kind: OrphanPodKeyKindName, + namespace: obj.Namespace, + value: obj.Name, } - jm.orphanQueue.Add(key) + jm.orphanQueue.Add(orphanPodKey) } // worker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -620,37 +647,70 @@ func (jm *Controller) processNextOrphanPod(ctx context.Context) bool { } // syncOrphanPod removes the tracking finalizer from an orphan pod if found. -func (jm *Controller) syncOrphanPod(ctx context.Context, key string) error { +func (jm *Controller) syncOrphanPod(ctx context.Context, key orphanPodKey) error { startTime := jm.clock.Now() logger := klog.FromContext(ctx) defer func() { logger.V(4).Info("Finished syncing orphan pod", "pod", key, "elapsed", jm.clock.Since(startTime)) }() - ns, name, err := cache.SplitMetaNamespaceKey(key) + switch key.kind { + case OrphanPodKeyKindName: + pod, err := jm.podStore.Pods(key.namespace).Get(key.value) + if err != nil { + if apierrors.IsNotFound(err) { + logger.V(4).Info("Orphan pod has been deleted", "pod", klog.KRef(key.namespace, key.value)) + return nil + } + return err + } + return jm.handleSingleOrphanPod(ctx, pod) + case OrphanPodKeyKindSelector: + logger.V(8).Info("syncing all pods matching the label selector", "namespace", key.namespace, "labelSelector", key.value) + return jm.syncOrphanPodsBySelector(ctx, key.namespace, key.value) + default: + return fmt.Errorf("unknown key type: %d", key.kind) + } +} + +// syncOrphanPodsBySelector fetches and processes all pods matching the given label selector. +func (jm *Controller) syncOrphanPodsBySelector(ctx context.Context, namespace string, labelSelector string) error { + logger := klog.FromContext(ctx) + selector, err := labels.Parse(labelSelector) if err != nil { - return err + return fmt.Errorf("invalid label selector: %w", err) } - sharedPod, err := jm.podStore.Pods(ns).Get(name) + // Fetch all pods that match the label selector. + // relatively expensive operation but it is called only from the orphan reconciler + pods, err := jm.podStore.Pods(namespace).List(selector) if err != nil { - if apierrors.IsNotFound(err) { - logger.V(4).Info("Orphan pod has been deleted", "pod", key) - return nil - } return err } + for _, pod := range pods { + if err := jm.handleSingleOrphanPod(ctx, pod); err != nil { + logger.Error(err, "syncing orphan pod failed", "pod", klog.KObj(pod)) + } + } + return nil +} + +// handleSingleOrphanPod processes a single orphan pod. +func (jm *Controller) handleSingleOrphanPod(ctx context.Context, sharedPod *v1.Pod) error { + logger := klog.FromContext(ctx) + ns := sharedPod.Namespace + name := sharedPod.Name // Make sure the pod is still orphaned. if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil { if controllerRef.Kind != controllerKind.Kind || controllerRef.APIVersion != batch.SchemeGroupVersion.String() { // The pod is controlled by an owner that is not a batch/v1 Job. Do not remove finalizer. return nil } - job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef) + job := jm.resolveControllerRef(ns, controllerRef) if job != nil { // Skip cleanup of finalizers for pods owned by a job managed by an external controller if controllerName := managedByExternalController(job); controllerName != nil { - logger.V(2).Info("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller", "key", key, "podUID", sharedPod.UID, "jobUID", job.UID, "controllerName", controllerName) + logger.V(2).Info("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller", "namespace", ns, "name", name, "podUID", sharedPod.UID, "jobUID", job.UID, "controllerName", controllerName) return nil } } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index c2902cf8135..32e8912f47c 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -6641,6 +6641,7 @@ func TestSyncOrphanPod(t *testing.T) { job *batch.Job inCache bool wantFinalizerRemoved bool + podSelector *metav1.LabelSelector }{ "controlled_by_existing_running_job": { owner: &metav1.OwnerReference{ @@ -6743,6 +6744,14 @@ func TestSyncOrphanPod(t *testing.T) { }, wantFinalizerRemoved: true, }, + "orphan_pods_by_label_selector": { + podSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + wantFinalizerRemoved: true, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -6756,19 +6765,35 @@ func TestSyncOrphanPod(t *testing.T) { } }) } - podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer() if tc.owner != nil { podBuilder = podBuilder.owner(*tc.owner) } - orphanPod := podBuilder.Pod - orphanPod, err := clientset.CoreV1().Pods("default").Create(ctx, orphanPod, metav1.CreateOptions{}) + orphanKey := orphanPodKey{ + kind: OrphanPodKeyKindName, + namespace: podBuilder.Pod.Namespace, + value: podBuilder.Pod.Name, + } + if tc.podSelector != nil { + podBuilder = podBuilder.labels(tc.podSelector.MatchLabels) + selector, err := metav1.LabelSelectorAsSelector(tc.podSelector) + if err != nil { + t.Fatalf("Error parsing pod label selector: %v", err) + } + orphanKey = orphanPodKey{ + kind: OrphanPodKeyKindSelector, + namespace: podBuilder.Pod.Namespace, + value: selector.String(), + } + } + orphanPod, err := clientset.CoreV1().Pods("default").Create(ctx, podBuilder.Pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Creating orphan pod: %v", err) } - err = manager.syncOrphanPod(ctx, cache.MetaObjectToName(orphanPod).String()) + // Sync orphan pod by name or selector + err = manager.syncOrphanPod(ctx, orphanKey) if err != nil { - t.Fatalf("Failed sync orphan pod: %v", err) + t.Fatalf("Failed to sync orphan pod: %v", err) } if tc.wantFinalizerRemoved { if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { @@ -6786,7 +6811,7 @@ func TestSyncOrphanPod(t *testing.T) { time.Sleep(time.Millisecond) orphanPod, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(ctx, orphanPod.Name, metav1.GetOptions{}) if err != nil { - t.Fatalf("Failed to the latest pod: %v", err) + t.Fatalf("Failed to retrieve the latest pod: %v", err) } if !hasJobTrackingFinalizer(orphanPod) { t.Errorf("Unexpected removal of the Job's finalizer") @@ -7796,6 +7821,11 @@ func (pb podBuilder) uid(u string) podBuilder { return pb } +func (pb podBuilder) labels(labels map[string]string) podBuilder { + pb.Labels = labels + return pb +} + func (pb podBuilder) job(j *batch.Job) podBuilder { pb.Labels = j.Spec.Selector.MatchLabels pb.Namespace = j.Namespace