cleanup after JobPodFailurePolicy is promoted to GA

This commit is contained in:
carlory 2024-07-15 17:50:47 +08:00
parent d0545c8eb4
commit dae05f3b88
3 changed files with 18 additions and 25 deletions

View File

@ -98,7 +98,7 @@ func calculateFailedIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod)
func isIndexFailed(logger klog.Logger, job *batch.Job, pod *v1.Pod) bool {
isPodFailedCounted := false
if isPodFailed(pod, job) {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
if job.Spec.PodFailurePolicy != nil {
_, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
if action != nil && *action == batch.PodFailurePolicyActionFailIndex {
return true
@ -361,7 +361,7 @@ func getNewIndexFailureCounts(logger klog.Logger, job *batch.Job, podBeingReplac
if podBeingReplaced != nil {
indexFailureCount := parseIndexFailureCountAnnotation(logger, podBeingReplaced)
indexIgnoredFailureCount := parseIndexFailureIgnoreCountAnnotation(logger, podBeingReplaced)
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
if job.Spec.PodFailurePolicy != nil {
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, podBeingReplaced)
if countFailed {
indexFailureCount++

View File

@ -844,7 +844,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// Given that the Job already has the SuccessCriteriaMet condition, the termination condition already had confirmed in another cycle.
// So, the job-controller evaluates the podFailurePolicy only when the Job doesn't have the SuccessCriteriaMet condition.
if jobCtx.finishedCondition == nil && (feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || delayTerminalCondition()) {
if jobCtx.finishedCondition == nil {
failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget)
if failureTargetCondition != nil && failureTargetCondition.Status == v1.ConditionTrue {
jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
@ -1026,7 +1026,7 @@ func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods
func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {
result := len(failedPods)
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
if jobCtx.job.Spec.PodFailurePolicy != nil {
for _, p := range failedPods {
_, countFailed, _ := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, p)
if !countFailed {
@ -1141,7 +1141,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// When the job is considered finished, every non-terminated pod is considered failed.
ix := getCompletionIndex(pod.Annotations)
if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
if jobCtx.job.Spec.PodFailurePolicy != nil {
_, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod)
if action != nil {
podFailureCountByPolicyAction[string(*action)] += 1
@ -1186,17 +1186,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
needsFlush = true
}
}
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || delayTerminalCondition() {
if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
// Append the interim FailureTarget condition to update the job status with before finalizers are removed.
jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
needsFlush = true
// Append the interim FailureTarget condition to update the job status with before finalizers are removed.
jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
needsFlush = true
// Prepare the final Failed condition to update the job status with after the finalizers are removed.
// It is also used in the enactJobFinished function for reporting.
jobCtx.finishedCondition = newFailedConditionForFailureTarget(jobCtx.finishedCondition, jm.clock.Now())
}
// Prepare the final Failed condition to update the job status with after the finalizers are removed.
// It is also used in the enactJobFinished function for reporting.
jobCtx.finishedCondition = newFailedConditionForFailureTarget(jobCtx.finishedCondition, jm.clock.Now())
}
if isSuccessCriteriaMetCondition(jobCtx.finishedCondition) {
// Append the interim SuccessCriteriaMet condition to update the job status with before finalizers are removed.
@ -1496,7 +1494,7 @@ func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatu
// getFailJobMessage returns a job failure message if the job should fail with the current counters
func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
if !feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || job.Spec.PodFailurePolicy == nil {
if job.Spec.PodFailurePolicy == nil {
return nil
}
for _, p := range pods {
@ -1903,8 +1901,9 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
}
func isPodFailed(p *v1.Pod, job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
// Orphan Pods and unschedulable terminating Pods are marked as Failed. So we only need to check the phase.
if job.Spec.PodFailurePolicy != nil {
// Orphan Pods and unschedulable terminating Pods are marked as Failed,
// so we only need to check the phase.
return p.Status.Phase == v1.PodFailed
}
if p.Status.Phase == v1.PodFailed {
@ -2001,7 +2000,7 @@ func trackTerminatingPods(job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.JobManagedBy) {
return true
}
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
return job.Spec.PodFailurePolicy != nil
}
// This checks if we should apply PodReplacementPolicy.
@ -2014,7 +2013,7 @@ func onlyReplaceFailedPods(job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && job.Spec.PodReplacementPolicy != nil && *job.Spec.PodReplacementPolicy == batch.Failed {
return true
}
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
return job.Spec.PodFailurePolicy != nil
}
func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {

View File

@ -98,9 +98,6 @@ func (jobStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
job.Generation = 1
if !utilfeature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
job.Spec.PodFailurePolicy = nil
}
if !utilfeature.DefaultFeatureGate.Enabled(features.JobManagedBy) {
job.Spec.ManagedBy = nil
}
@ -137,9 +134,6 @@ func (jobStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object
oldJob := old.(*batch.Job)
newJob.Status = oldJob.Status
if !utilfeature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && oldJob.Spec.PodFailurePolicy == nil {
newJob.Spec.PodFailurePolicy = nil
}
if !utilfeature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) && oldJob.Spec.SuccessPolicy == nil {
newJob.Spec.SuccessPolicy = nil
}