Merge pull request #127487 from hakuna-matatah/jobperf-delete-eventhandler

Offload the main Job reconciler w.r.t cleaning finalizers
This commit is contained in:
Kubernetes Prow Robot 2024-09-23 18:08:06 +01:00 committed by GitHub
commit 851cf43a35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -486,6 +486,12 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
if err != nil {
return
}
// The job shouldn't be marked as finished until all pod finalizers are removed, so
// this is a backup operation.
if util.IsJobFinished(curJob) {
jm.enqueueLabelSelector(curJob)
return
}
if curJob.Generation == oldJob.Generation {
// Delay the Job sync when no generation change to batch Job status updates,
@ -530,6 +536,10 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
return
}
}
jm.enqueueLabelSelector(jobObj)
}
func (jm *Controller) enqueueLabelSelector(jobObj *batch.Job) {
selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("job %s/%s has invalid label selector: %w", jobObj.Namespace, jobObj.Name, err))
@ -826,9 +836,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// if job was finished previously, we don't want to redo the termination
if util.IsJobFinished(&job) {
// The job shouldn't be marked as finished until all pod finalizers are removed.
// Cleaning pod finalizers one more time just in case.
jm.cleanupPodFinalizers(&job)
err := jm.podBackoffStore.removeBackoffRecord(key)
if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs
@ -2072,21 +2079,6 @@ func onlyReplaceFailedPods(job *batch.Job) bool {
return job.Spec.PodFailurePolicy != nil
}
func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
// Listing pods shouldn't really fail, as we are just querying the informer cache.
selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
return
}
pods, _ := jm.podStore.Pods(job.Namespace).List(selector)
for _, pod := range pods {
if metav1.IsControlledBy(pod, job) && hasJobTrackingFinalizer(pod) {
jm.enqueueOrphanPod(pod)
}
}
}
func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) {
reason := metrics.PodCreateNew
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {