keep delete handler and update handler consistent for finished jobs and offload mainqueue

This commit is contained in:
Harish Kuna
2024-09-19 22:53:38 +00:00
parent f0c7640337
commit 0615ded5f7

View File

@@ -486,6 +486,12 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
if err != nil {
return
}
// The job shouldn't be marked as finished until all pod finalizers are removed, so
// this is a backup operation.
if util.IsJobFinished(curJob) {
jm.enqueueLabelSelector(curJob)
return
}
if curJob.Generation == oldJob.Generation {
// Delay the Job sync when no generation change to batch Job status updates,
@@ -530,6 +536,10 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
return
}
}
jm.enqueueLabelSelector(jobObj)
}
func (jm *Controller) enqueueLabelSelector(jobObj *batch.Job) {
selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("job %s/%s has invalid label selector: %w", jobObj.Namespace, jobObj.Name, err))
@@ -826,9 +836,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// if job was finished previously, we don't want to redo the termination
if util.IsJobFinished(&job) {
// The job shouldn't be marked as finished until all pod finalizers are removed.
// Cleaning pod finalizers one more time just in case.
jm.cleanupPodFinalizers(&job)
err := jm.podBackoffStore.removeBackoffRecord(key)
if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs
@@ -2072,21 +2079,6 @@ func onlyReplaceFailedPods(job *batch.Job) bool {
return job.Spec.PodFailurePolicy != nil
}
func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
// Listing pods shouldn't really fail, as we are just querying the informer cache.
selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
return
}
pods, _ := jm.podStore.Pods(job.Namespace).List(selector)
for _, pod := range pods {
if metav1.IsControlledBy(pod, job) && hasJobTrackingFinalizer(pod) {
jm.enqueueOrphanPod(pod)
}
}
}
func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) {
reason := metrics.PodCreateNew
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {