Remove Legacy Job Tracking

This commit is contained in:
kannon92 2022-12-21 22:11:27 +00:00
parent b3138ba1b3
commit 6dfaeff33c
5 changed files with 284 additions and 592 deletions

View File

@ -52,17 +52,13 @@ type orderedIntervals []interval
// empty list if this Job is not tracked with finalizers. The new list includes // empty list if this Job is not tracked with finalizers. The new list includes
// the indexes that succeeded since the last sync. // the indexes that succeeded since the last sync.
func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
var prevIntervals orderedIntervals prevIntervals := succeededIndexesFromJob(job)
withFinalizers := hasJobTrackingAnnotation(job)
if withFinalizers {
prevIntervals = succeededIndexesFromJob(job)
}
newSucceeded := sets.NewInt() newSucceeded := sets.NewInt()
for _, p := range pods { for _, p := range pods {
ix := getCompletionIndex(p.Annotations) ix := getCompletionIndex(p.Annotations)
// Succeeded Pod with valid index and, if tracking with finalizers, // Succeeded Pod with valid index and, if tracking with finalizers,
// has a finalizer (meaning that it is not counted yet). // has a finalizer (meaning that it is not counted yet).
if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && (!withFinalizers || hasJobTrackingFinalizer(p)) { if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) {
newSucceeded.Insert(ix) newSucceeded.Insert(ix)
} }
} }

View File

@ -29,12 +29,11 @@ const noIndex = "-"
func TestCalculateSucceededIndexes(t *testing.T) { func TestCalculateSucceededIndexes(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
prevSucceeded string prevSucceeded string
pods []indexPhase pods []indexPhase
completions int32 completions int32
trackingWithFinalizers bool wantStatusIntervals orderedIntervals
wantStatusIntervals orderedIntervals wantIntervals orderedIntervals
wantIntervals orderedIntervals
}{ }{
"one index": { "one index": {
pods: []indexPhase{{"1", v1.PodSucceeded}}, pods: []indexPhase{{"1", v1.PodSucceeded}},
@ -65,19 +64,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
completions: 8, completions: 8,
wantIntervals: []interval{{2, 3}, {5, 7}}, wantIntervals: []interval{{2, 3}, {5, 7}},
}, },
"one interval, ignore previous": {
prevSucceeded: "3-5",
pods: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"1", v1.PodSucceeded},
{"2", v1.PodSucceeded},
{"2", v1.PodSucceeded},
{"3", v1.PodFailed},
},
completions: 4,
wantIntervals: []interval{{0, 2}},
},
"one index and one interval": { "one index and one interval": {
pods: []indexPhase{ pods: []indexPhase{
{"0", v1.PodSucceeded}, {"0", v1.PodSucceeded},
@ -107,18 +93,16 @@ func TestCalculateSucceededIndexes(t *testing.T) {
wantIntervals: []interval{{0, 2}, {4, 4}}, wantIntervals: []interval{{0, 2}, {4, 4}},
}, },
"prev interval out of range": { "prev interval out of range": {
prevSucceeded: "0-5,8-10", prevSucceeded: "0-5,8-10",
completions: 8, completions: 8,
trackingWithFinalizers: true, wantStatusIntervals: []interval{{0, 5}},
wantStatusIntervals: []interval{{0, 5}}, wantIntervals: []interval{{0, 5}},
wantIntervals: []interval{{0, 5}},
}, },
"prev interval partially out of range": { "prev interval partially out of range": {
prevSucceeded: "0-5,8-10", prevSucceeded: "0-5,8-10",
completions: 10, completions: 10,
trackingWithFinalizers: true, wantStatusIntervals: []interval{{0, 5}, {8, 9}},
wantStatusIntervals: []interval{{0, 5}, {8, 9}}, wantIntervals: []interval{{0, 5}, {8, 9}},
wantIntervals: []interval{{0, 5}, {8, 9}},
}, },
"prev and new separate": { "prev and new separate": {
prevSucceeded: "0,4,5,10-12", prevSucceeded: "0,4,5,10-12",
@ -127,8 +111,7 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"7", v1.PodSucceeded}, {"7", v1.PodSucceeded},
{"8", v1.PodSucceeded}, {"8", v1.PodSucceeded},
}, },
completions: 13, completions: 13,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{ wantStatusIntervals: []interval{
{0, 0}, {0, 0},
{4, 5}, {4, 5},
@ -149,8 +132,7 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"7", v1.PodSucceeded}, {"7", v1.PodSucceeded},
{"8", v1.PodSucceeded}, {"8", v1.PodSucceeded},
}, },
completions: 9, completions: 9,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{ wantStatusIntervals: []interval{
{3, 4}, {3, 4},
{6, 6}, {6, 6},
@ -167,8 +149,7 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"4", v1.PodSucceeded}, {"4", v1.PodSucceeded},
{"6", v1.PodSucceeded}, {"6", v1.PodSucceeded},
}, },
completions: 9, completions: 9,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{ wantStatusIntervals: []interval{
{2, 2}, {2, 2},
{7, 8}, {7, 8},
@ -186,8 +167,7 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"5", v1.PodSucceeded}, {"5", v1.PodSucceeded},
{"9", v1.PodSucceeded}, {"9", v1.PodSucceeded},
}, },
completions: 10, completions: 10,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{ wantStatusIntervals: []interval{
{2, 7}, {2, 7},
}, },
@ -202,8 +182,7 @@ func TestCalculateSucceededIndexes(t *testing.T) {
pods: []indexPhase{ pods: []indexPhase{
{"3", v1.PodSucceeded}, {"3", v1.PodSucceeded},
}, },
completions: 4, completions: 4,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{ wantStatusIntervals: []interval{
{0, 0}, {0, 0},
}, },
@ -223,11 +202,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
Completions: pointer.Int32(tc.completions), Completions: pointer.Int32(tc.completions),
}, },
} }
if tc.trackingWithFinalizers {
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
}
pods := hollowPodsWithIndexPhase(tc.pods) pods := hollowPodsWithIndexPhase(tc.pods)
for _, p := range pods { for _, p := range pods {
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer) p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)

View File

@ -614,9 +614,9 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
// getPodsForJob returns the set of pods that this Job should manage. // getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning, adding tracking // It also reconciles ControllerRef by adopting/orphaning, adding tracking
// finalizers, if enabled. // finalizers.
// Note that the returned Pods are pointers into the cache. // Note that the returned Pods are pointers into the cache.
func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) { func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err) return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
@ -639,14 +639,10 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinal
} }
return fresh, nil return fresh, nil
}) })
var finalizers []string cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, batch.JobTrackingFinalizer)
if withFinalizers {
finalizers = append(finalizers, batch.JobTrackingFinalizer)
}
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...)
// When adopting Pods, this operation adds an ownerRef and finalizers. // When adopting Pods, this operation adds an ownerRef and finalizers.
pods, err = cm.ClaimPods(ctx, pods) pods, err = cm.ClaimPods(ctx, pods)
if err != nil || !withFinalizers { if err != nil {
return pods, err return pods, err
} }
// Set finalizer on adopted pods for the remaining calculations. // Set finalizer on adopted pods for the remaining calculations.
@ -718,23 +714,18 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc() metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}() }()
var expectedRmFinalizers sets.String if job.Status.UncountedTerminatedPods == nil {
var uncounted *uncountedTerminatedPods job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
if hasJobTrackingAnnotation(&job) {
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
} }
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key)
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters // and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist. // the store after we've checked the expectation, the job sync is just deferred till the next relist.
satisfiedExpectations := jm.expectations.SatisfiedExpectations(key) satisfiedExpectations := jm.expectations.SatisfiedExpectations(key)
pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil) pods, err := jm.getPodsForJob(ctx, &job)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -767,13 +758,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
} else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil { } else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil {
if uncounted != nil { // Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed. finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
} else {
// Prepare the Failed job condition for the legacy path without finalizers (don't use the interim FailureTarget condition).
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
}
} }
} }
if finishedCondition == nil { if finishedCondition == nil {
@ -799,16 +785,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
// Remove active pods if Job failed. // Remove active pods if Job failed.
if finishedCondition != nil { if finishedCondition != nil {
deleted, err := jm.deleteActivePods(ctx, &job, activePods) deleted, err := jm.deleteActivePods(ctx, &job, activePods)
if uncounted == nil { if deleted != active || !satisfiedExpectations {
// Legacy behavior: pretend all active pods were successfully removed.
deleted = active
} else if deleted != active || !satisfiedExpectations {
// Can't declare the Job as finished yet, as there might be remaining // Can't declare the Job as finished yet, as there might be remaining
// pod finalizers or pods that are not in the informer's cache yet. // pod finalizers or pods that are not in the informer's cache yet.
finishedCondition = nil finishedCondition = nil
} }
active -= deleted active -= deleted
failed += deleted
manageJobErr = err manageJobErr = err
} else { } else {
manageJobCalled := false manageJobCalled := false
@ -872,56 +854,19 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
// In this case, we should clear the backoff delay. // In this case, we should clear the backoff delay.
forget = job.Status.Succeeded < succeeded forget = job.Status.Succeeded < succeeded
if uncounted != nil { needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) job.Status.Active = active
job.Status.Active = active job.Status.Ready = ready
job.Status.Ready = ready err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) if err != nil {
if err != nil { return false, fmt.Errorf("tracking status: %w", err)
return false, fmt.Errorf("tracking status: %w", err)
}
jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
return forget, manageJobErr
} }
// Legacy path: tracking without finalizers. jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// Ensure that there are no leftover tracking finalizers. // returning an error will re-enqueue Job after the backoff period
if err := jm.removeTrackingFinalizersFromAllPods(ctx, pods); err != nil { return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
} }
forget = true
// no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || !equalReady(job.Status.Ready, ready) || suspendCondChanged || finishedCondition != nil {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
job.Status.Ready = ready
if isIndexedJob(&job) {
job.Status.CompletedIndexes = succeededIndexes.String()
}
job.Status.UncountedTerminatedPods = nil
jobFinished := jm.enactJobFinished(&job, finishedCondition)
if _, err := jm.updateStatusHandler(ctx, &job); err != nil {
return forget, err
}
if jobFinished {
jm.recordJobFinished(&job, finishedCondition)
}
if jobHasNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
}
return forget, manageJobErr return forget, manageJobErr
} }
@ -986,23 +931,6 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
return successfulDeletes, errorFromChannel(errCh) return successfulDeletes, errorFromChannel(errCh)
} }
// removeTrackingFinalizersFromAllPods removes finalizers from any Job Pod. This is called
// when Job tracking with finalizers is disabled.
func (jm *Controller) removeTrackingFinalizersFromAllPods(ctx context.Context, pods []*v1.Pod) error {
var podsWithFinalizer []*v1.Pod
for _, pod := range pods {
if hasJobTrackingFinalizer(pod) {
podsWithFinalizer = append(podsWithFinalizer, pod)
}
}
if len(podsWithFinalizer) == 0 {
return nil
}
// Tracking with finalizers is disabled, no need to set expectations.
_, err := jm.removeTrackingFinalizerFromPods(ctx, "", podsWithFinalizer)
return err
}
// trackJobStatusAndRemoveFinalizers does: // trackJobStatusAndRemoveFinalizers does:
// 1. Add finished Pods to .status.uncountedTerminatedPods // 1. Add finished Pods to .status.uncountedTerminatedPods
// 2. Remove the finalizers from the Pods if they completed or were removed // 2. Remove the finalizers from the Pods if they completed or were removed
@ -1053,7 +981,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// considerTerminated || job.DeletionTimestamp != nil // considerTerminated || job.DeletionTimestamp != nil
considerTerminated = podutil.IsPodTerminal(pod) || considerTerminated = podutil.IsPodTerminal(pod) ||
finishedCond != nil || // The Job is terminating. Any running Pod is considered failed. finishedCond != nil || // The Job is terminating. Any running Pod is considered failed.
isPodFailed(pod, job, true /* using finalizers */) isPodFailed(pod, job)
} }
if podutil.IsPodTerminal(pod) || considerTerminated || job.DeletionTimestamp != nil { if podutil.IsPodTerminal(pod) || considerTerminated || job.DeletionTimestamp != nil {
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
@ -1361,7 +1289,7 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
return nil return nil
} }
for _, p := range pods { for _, p := range pods {
if isPodFailed(p, job, uncounted != nil) { if isPodFailed(p, job) {
jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
if jobFailureMessage != nil { if jobFailureMessage != nil {
return jobFailureMessage return jobFailureMessage
@ -1374,22 +1302,20 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
// getStatus returns number of succeeded and failed pods running a job. The number // getStatus returns number of succeeded and failed pods running a job. The number
// of failed pods can be affected by the podFailurePolicy. // of failed pods can be affected by the podFailurePolicy.
func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) { func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) {
if uncounted != nil { succeeded = job.Status.Succeeded
succeeded = job.Status.Succeeded failed = job.Status.Failed
failed = job.Status.Failed
}
succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
return p.Status.Phase == v1.PodSucceeded return p.Status.Phase == v1.PodSucceeded
})) }))
failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
if !isPodFailed(p, job, uncounted != nil) { if !isPodFailed(p, job) {
return false return false
} }
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) _, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
return countFailed return countFailed
} else { } else {
return isPodFailed(p, job, uncounted != nil) return isPodFailed(p, job)
} }
})) }))
return succeeded, failed return succeeded, failed
@ -1487,9 +1413,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
if isIndexedJob(job) { if isIndexedJob(job) {
addCompletionIndexEnvVariables(podTemplate) addCompletionIndexEnvVariables(podTemplate)
} }
if hasJobTrackingAnnotation(job) { podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
}
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start". // and double with each successful iteration in a kind of "slow start".
@ -1646,14 +1570,6 @@ func getCompletionMode(job *batch.Job) string {
return string(batch.NonIndexedCompletion) return string(batch.NonIndexedCompletion)
} }
func hasJobTrackingAnnotation(job *batch.Job) bool {
if job.Annotations == nil {
return false
}
_, ok := job.Annotations[batch.JobTrackingFinalizer]
return ok
}
func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string { func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string {
for _, fin := range finalizers { for _, fin := range finalizers {
if fin == batch.JobTrackingFinalizer { if fin == batch.JobTrackingFinalizer {
@ -1739,7 +1655,7 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
return list, false return list, false
} }
func isPodFailed(p *v1.Pod, job *batch.Job, wFinalizers bool) bool { func isPodFailed(p *v1.Pod, job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
// When PodDisruptionConditions is enabled, orphan Pods and unschedulable // When PodDisruptionConditions is enabled, orphan Pods and unschedulable
// terminating Pods are marked as Failed. So we only need to check the phase. // terminating Pods are marked as Failed. So we only need to check the phase.
@ -1751,10 +1667,9 @@ func isPodFailed(p *v1.Pod, job *batch.Job, wFinalizers bool) bool {
if p.Status.Phase == v1.PodFailed { if p.Status.Phase == v1.PodFailed {
return true return true
} }
// When tracking with finalizers: counting deleted Pods as failures to // Count deleted Pods as failures to account for orphan Pods that
// account for orphan Pods that never have a chance to reach the Failed // never have a chance to reach the Failed phase.
// phase. return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
return wFinalizers && p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
} }
func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition { func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition {

View File

@ -141,9 +141,7 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
newPod.Status = v1.PodStatus{Phase: status} newPod.Status = v1.PodStatus{Phase: status}
if hasJobTrackingAnnotation(job) { newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
}
pods = append(pods, newPod) pods = append(pods, newPod)
} }
return pods return pods
@ -184,9 +182,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
} }
p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index) p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index)
} }
if hasJobTrackingAnnotation(job) { p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
}
podIndexer.Add(p) podIndexer.Add(p)
} }
} }
@ -207,9 +203,6 @@ func TestControllerSyncJob(t *testing.T) {
wasSuspended bool wasSuspended bool
suspend bool suspend bool
// If set, it means that the case is exclusive to tracking with/without finalizers.
wFinalizersExclusive *bool
// pod setup // pod setup
podControllerError error podControllerError error
jobKeyForget bool jobKeyForget bool
@ -495,7 +488,6 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches: 1, expectedPodPatches: 1,
}, },
"job failures, unsatisfied expectations": { "job failures, unsatisfied expectations": {
wFinalizersExclusive: pointer.Bool(true),
parallelism: 2, parallelism: 2,
completions: 5, completions: 5,
deleting: true, deleting: true,
@ -725,175 +717,161 @@ func TestControllerSyncJob(t *testing.T) {
} }
for name, tc := range testCases { for name, tc := range testCases {
for _, wFinalizers := range []bool{false, true} { t.Run(name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s, finalizers=%t", name, wFinalizers), func(t *testing.T) { if tc.podControllerError != nil {
if wFinalizers && tc.podControllerError != nil { t.Skip("Can't track status if finalizers can't be removed")
t.Skip("Can't track status if finalizers can't be removed") }
} defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers {
t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive)
}
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
// job manager setup // job manager setup
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode)
job.Spec.Suspend = pointer.Bool(tc.suspend) job.Spec.Suspend = pointer.Bool(tc.suspend)
key, err := controller.KeyFunc(job) key, err := controller.KeyFunc(job)
if err != nil { if err != nil {
t.Errorf("Unexpected error getting job key: %v", err) t.Errorf("Unexpected error getting job key: %v", err)
} }
if tc.fakeExpectationAtCreation < 0 { if tc.fakeExpectationAtCreation < 0 {
manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation))
} else if tc.fakeExpectationAtCreation > 0 { } else if tc.fakeExpectationAtCreation > 0 {
manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation)) manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation))
} }
if tc.wasSuspended { if tc.wasSuspended {
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now())) job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now()))
} }
if wFinalizers { if tc.deleting {
job.Annotations = map[string]string{ now := metav1.Now()
batch.JobTrackingFinalizer: "", job.DeletionTimestamp = &now
} }
} sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
if tc.deleting { podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
now := metav1.Now() setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods)
job.DeletionTimestamp = &now setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes)
}
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods)
setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes)
actual := job actual := job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
// run // run
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
// We need requeue syncJob task if podController error // We need requeue syncJob task if podController error
if tc.podControllerError != nil { if tc.podControllerError != nil {
if err == nil { if err == nil {
t.Error("Syncing jobs expected to return error on podControl exception") t.Error("Syncing jobs expected to return error on podControl exception")
}
} else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
if err == nil {
t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
}
} else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
if err == nil {
t.Error("Syncing jobs expected to return error when reached the podControl limit")
}
} else if err != nil {
t.Errorf("Unexpected error when syncing jobs: %v", err)
} }
if forget != tc.jobKeyForget { } else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget) if err == nil {
t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
} }
// validate created/deleted pods } else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
if int32(len(fakePodControl.Templates)) != tc.expectedCreations { if err == nil {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) t.Error("Syncing jobs expected to return error when reached the podControl limit")
} }
if tc.completionMode == batch.IndexedCompletion { } else if err != nil {
checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name) t.Errorf("Unexpected error when syncing jobs: %v", err)
} else { }
for _, p := range fakePodControl.Templates { if forget != tc.jobKeyForget {
// Fake pod control doesn't add generate name from the owner reference. t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
if p.GenerateName != "" { }
t.Errorf("Got pod generate name %s, want %s", p.GenerateName, "") // validate created/deleted pods
} if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
if p.Spec.Hostname != "" { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates))
t.Errorf("Got pod hostname %q, want none", p.Spec.Hostname) }
} if tc.completionMode == batch.IndexedCompletion {
checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name)
} else {
for _, p := range fakePodControl.Templates {
// Fake pod control doesn't add generate name from the owner reference.
if p.GenerateName != "" {
t.Errorf("Got pod generate name %s, want %s", p.GenerateName, "")
}
if p.Spec.Hostname != "" {
t.Errorf("Got pod hostname %q, want none", p.Spec.Hostname)
} }
} }
if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { }
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName))
}
// Each create should have an accompanying ControllerRef.
if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) {
t.Errorf("Unexpected number of ControllerRefs. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.ControllerRefs))
}
// Make sure the ControllerRefs are correct.
for _, controllerRef := range fakePodControl.ControllerRefs {
if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
} }
// Each create should have an accompanying ControllerRef. if got, want := controllerRef.Kind, "Job"; got != want {
if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) { t.Errorf("controllerRef.Kind = %q, want %q", got, want)
t.Errorf("Unexpected number of ControllerRefs. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.ControllerRefs))
} }
// Make sure the ControllerRefs are correct. if got, want := controllerRef.Name, job.Name; got != want {
for _, controllerRef := range fakePodControl.ControllerRefs { t.Errorf("controllerRef.Name = %q, want %q", got, want)
if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
}
if got, want := controllerRef.Kind, "Job"; got != want {
t.Errorf("controllerRef.Kind = %q, want %q", got, want)
}
if got, want := controllerRef.Name, job.Name; got != want {
t.Errorf("controllerRef.Name = %q, want %q", got, want)
}
if got, want := controllerRef.UID, job.UID; got != want {
t.Errorf("controllerRef.UID = %q, want %q", got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("controllerRef.Controller is not set to true")
}
} }
// validate status if got, want := controllerRef.UID, job.UID; got != want {
if actual.Status.Active != tc.expectedActive { t.Errorf("controllerRef.UID = %q, want %q", got, want)
t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
} }
if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" { if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff) t.Errorf("controllerRef.Controller is not set to true")
} }
if actual.Status.Succeeded != tc.expectedSucceeded { }
t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) // validate status
if actual.Status.Active != tc.expectedActive {
t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
}
if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" {
t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff)
}
if actual.Status.Succeeded != tc.expectedSucceeded {
t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
}
if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" {
t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
}
if actual.Status.Failed != tc.expectedFailed {
t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
}
if actual.Status.StartTime != nil && tc.suspend {
t.Error("Unexpected .status.startTime not nil when suspend is true")
}
if actual.Status.StartTime == nil && !tc.suspend {
t.Error("Missing .status.startTime")
}
// validate conditions
if tc.expectedCondition != nil {
if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) {
t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions)
} }
if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" { } else {
t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) if cond := hasTrueCondition(actual); cond != nil {
t.Errorf("Got condition %s, want none", *cond)
} }
if actual.Status.Failed != tc.expectedFailed { }
t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 {
} t.Errorf("Unexpected conditions %v", actual.Status.Conditions)
if actual.Status.StartTime != nil && tc.suspend { }
t.Error("Unexpected .status.startTime not nil when suspend is true") // validate slow start
} expectedLimit := 0
if actual.Status.StartTime == nil && !tc.suspend { for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ {
t.Error("Missing .status.startTime") expectedLimit += controller.SlowStartInitialBatchSize << pass
} }
// validate conditions if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit {
if tc.expectedCondition != nil { t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { }
t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) if p := len(fakePodControl.Patches); p != tc.expectedPodPatches {
} t.Errorf("Got %d pod patches, want %d", p, tc.expectedPodPatches)
} else { }
if cond := hasTrueCondition(actual); cond != nil { })
t.Errorf("Got condition %s, want none", *cond)
}
}
if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 {
t.Errorf("Unexpected conditions %v", actual.Status.Conditions)
}
// validate slow start
expectedLimit := 0
for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ {
expectedLimit += controller.SlowStartInitialBatchSize << pass
}
if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
}
wantPodPatches := 0
if wFinalizers {
wantPodPatches = tc.expectedPodPatches
}
if p := len(fakePodControl.Patches); p != wantPodPatches {
t.Errorf("Got %d pod patches, want %d", p, wantPodPatches)
}
})
}
} }
} }
@ -922,94 +900,6 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI
} }
} }
// TestSyncJobLegacyTracking makes sure that a Job is only tracked with
// finalizers when the job has the annotation.
func TestSyncJobLegacyTracking(t *testing.T) {
cases := map[string]struct {
job batch.Job
wantUncounted bool
wantPatches int
}{
"no annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32(1),
},
},
},
"tracking annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
Annotations: map[string]string{
batch.JobTrackingFinalizer: "",
},
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32(1),
},
},
wantUncounted: true,
},
"different annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
Annotations: map[string]string{
"foo": "bar",
},
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32(1),
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
// Job manager setup.
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
jobPatches := 0
manager.patchJobHandler = func(context.Context, *batch.Job, []byte) error {
jobPatches++
return nil
}
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(&tc.job)
var actual *batch.Job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
return job, nil
}
// Run.
_, err := manager.syncJob(context.TODO(), testutil.GetKey(&tc.job, t))
if err != nil {
t.Fatalf("Syncing job: %v", err)
}
// Checks.
if got := actual.Status.UncountedTerminatedPods != nil; got != tc.wantUncounted {
t.Errorf("Job got uncounted pods %t, want %t", got, tc.wantUncounted)
}
if jobPatches != tc.wantPatches {
t.Errorf("Sync did %d patches, want %d", jobPatches, tc.wantPatches)
}
})
}
}
func TestGetStatus(t *testing.T) { func TestGetStatus(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
job batch.Job job batch.Job
@ -1018,24 +908,6 @@ func TestGetStatus(t *testing.T) {
wantSucceeded int32 wantSucceeded int32
wantFailed int32 wantFailed int32
}{ }{
"without finalizers": {
job: batch.Job{
Status: batch.JobStatus{
Succeeded: 1,
Failed: 2,
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("b").phase(v1.PodSucceeded).Pod,
buildPod().uid("c").phase(v1.PodFailed).Pod,
buildPod().uid("d").phase(v1.PodFailed).Pod,
buildPod().uid("e").phase(v1.PodFailed).Pod,
buildPod().uid("f").phase(v1.PodRunning).Pod,
},
wantSucceeded: 2,
wantFailed: 3,
},
"some counted": { "some counted": {
job: batch.Job{ job: batch.Job{
Status: batch.JobStatus{ Status: batch.JobStatus{
@ -1103,16 +975,6 @@ func TestGetStatus(t *testing.T) {
wantFailed: 5, wantFailed: 5,
}, },
"deleted pods": { "deleted pods": {
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod,
buildPod().uid("b").phase(v1.PodFailed).deletionTimestamp().Pod,
buildPod().uid("c").phase(v1.PodRunning).deletionTimestamp().Pod,
buildPod().uid("d").phase(v1.PodPending).deletionTimestamp().Pod,
},
wantSucceeded: 1,
wantFailed: 1,
},
"deleted pods, tracking with finalizers": {
job: batch.Job{ job: batch.Job{
Status: batch.JobStatus{ Status: batch.JobStatus{
Succeeded: 1, Succeeded: 1,
@ -1134,10 +996,7 @@ func TestGetStatus(t *testing.T) {
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
var uncounted *uncountedTerminatedPods uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
if tc.job.Status.UncountedTerminatedPods != nil {
uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
}
succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
if succeeded != tc.wantSucceeded { if succeeded != tc.wantSucceeded {
t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded)
@ -1869,9 +1728,6 @@ func TestPastDeadlineJobFinished(t *testing.T) {
start := metav1.NewTime(fakeClock.Now()) start := metav1.NewTime(fakeClock.Now())
job.Status.StartTime = &start job.Status.StartTime = &start
} }
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
_, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) _, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{})
if err != nil { if err != nil {
@ -2063,7 +1919,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
} }
testCases := map[string]struct { testCases := map[string]struct {
wFinalizersExclusive *bool
enableJobPodFailurePolicy bool enableJobPodFailurePolicy bool
enablePodDisruptionConditions bool enablePodDisruptionConditions bool
job batch.Job job batch.Job
@ -2949,7 +2804,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
wantStatusSucceeded: 0, wantStatusSucceeded: 0,
}, },
"terminating Pod considered failed when PodDisruptionConditions is disabled": { "terminating Pod considered failed when PodDisruptionConditions is disabled": {
wFinalizersExclusive: pointer.Bool(true),
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
@ -3030,76 +2884,63 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
wantStatusActive: 1, // This is a replacement Pod: the terminating Pod is neither active nor failed. wantStatusActive: 1, // This is a replacement Pod: the terminating Pod is neither active nor failed.
}, },
} }
for _, wFinalizers := range []bool{false, true} { for name, tc := range testCases {
for name, tc := range testCases { t.Run(name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
} manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() fakePodControl := controller.FakePodControl{}
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() manager.podControl = &fakePodControl
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager.podStoreSynced = alwaysReady
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.jobStoreSynced = alwaysReady
fakePodControl := controller.FakePodControl{} job := &tc.job
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
job := &tc.job
if wFinalizers { actual := job
job.Annotations = map[string]string{ manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
batch.JobTrackingFinalizer: "", actual = job
} return job, nil
}
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
for i, pod := range tc.pods {
pod := pod
pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job)
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion {
pb.index(fmt.Sprintf("%v", i))
} }
pb = pb.trackingFinalizer()
sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod)
}
actual := job manager.syncJob(context.TODO(), testutil.GetKey(job, t))
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
return job, nil
}
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
for i, pod := range tc.pods {
pod := pod
pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job)
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion {
pb.index(fmt.Sprintf("%v", i))
}
if wFinalizers {
pb.trackingFinalizer()
}
sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod)
}
manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if tc.wantConditions != nil {
for _, wantCondition := range *tc.wantConditions {
if tc.wantConditions != nil { conditions := getConditionsByType(actual.Status.Conditions, wantCondition.Type)
for _, wantCondition := range *tc.wantConditions { if len(conditions) != 1 {
conditions := getConditionsByType(actual.Status.Conditions, wantCondition.Type) t.Fatalf("Expected a single completion condition. Got %#v for type: %q", conditions, wantCondition.Type)
if len(conditions) != 1 {
t.Fatalf("Expected a single completion condition. Got %#v for type: %q", conditions, wantCondition.Type)
}
condition := *conditions[0]
if diff := cmp.Diff(wantCondition, condition, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
t.Errorf("Unexpected job condition (-want,+got):\n%s", diff)
}
} }
} else { condition := *conditions[0]
if cond := hasTrueCondition(actual); cond != nil { if diff := cmp.Diff(wantCondition, condition, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
t.Errorf("Got condition %s, want none", *cond) t.Errorf("Unexpected job condition (-want,+got):\n%s", diff)
} }
} }
// validate status } else {
if actual.Status.Active != tc.wantStatusActive { if cond := hasTrueCondition(actual); cond != nil {
t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.wantStatusActive, actual.Status.Active) t.Errorf("Got condition %s, want none", *cond)
} }
if actual.Status.Succeeded != tc.wantStatusSucceeded { }
t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.wantStatusSucceeded, actual.Status.Succeeded) // validate status
} if actual.Status.Active != tc.wantStatusActive {
if actual.Status.Failed != tc.wantStatusFailed { t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.wantStatusActive, actual.Status.Active)
t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed) }
} if actual.Status.Succeeded != tc.wantStatusSucceeded {
}) t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.wantStatusSucceeded, actual.Status.Succeeded)
} }
if actual.Status.Failed != tc.wantStatusFailed {
t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed)
}
})
} }
} }
@ -3291,50 +3132,46 @@ func TestGetPodsForJob(t *testing.T) {
}, },
} }
for name, tc := range cases { for name, tc := range cases {
for _, wFinalizers := range []bool{false, true} { t.Run(name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s, finalizers=%t", name, wFinalizers), func(t *testing.T) { job := job.DeepCopy()
job := job.DeepCopy() if tc.jobDeleted {
if tc.jobDeleted { job.DeletionTimestamp = &metav1.Time{}
job.DeletionTimestamp = &metav1.Time{} }
} clientSet := fake.NewSimpleClientset(job, otherJob)
clientSet := fake.NewSimpleClientset(job, otherJob) jm, informer := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady
jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady cachedJob := job.DeepCopy()
cachedJob := job.DeepCopy() if tc.jobDeletedInCache {
if tc.jobDeletedInCache { cachedJob.DeletionTimestamp = &metav1.Time{}
cachedJob.DeletionTimestamp = &metav1.Time{} }
} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(cachedJob)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(cachedJob) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(otherJob)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(otherJob) for _, p := range tc.pods {
for _, p := range tc.pods { informer.Core().V1().Pods().Informer().GetIndexer().Add(p)
informer.Core().V1().Pods().Informer().GetIndexer().Add(p) }
}
pods, err := jm.getPodsForJob(context.TODO(), job, wFinalizers) pods, err := jm.getPodsForJob(context.TODO(), job)
if err != nil { if err != nil {
t.Fatalf("getPodsForJob() error: %v", err) t.Fatalf("getPodsForJob() error: %v", err)
}
got := make([]string, len(pods))
var gotFinalizer []string
for i, p := range pods {
got[i] = p.Name
if hasJobTrackingFinalizer(p) {
gotFinalizer = append(gotFinalizer, p.Name)
} }
got := make([]string, len(pods)) }
var gotFinalizer []string sort.Strings(got)
for i, p := range pods { if diff := cmp.Diff(tc.wantPods, got); diff != "" {
got[i] = p.Name t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff)
if hasJobTrackingFinalizer(p) { }
gotFinalizer = append(gotFinalizer, p.Name) sort.Strings(gotFinalizer)
} if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" {
} t.Errorf("Pods with finalizers (-want,+got):\n%s", diff)
sort.Strings(got) }
if diff := cmp.Diff(tc.wantPods, got); diff != "" { })
t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff)
}
if wFinalizers {
sort.Strings(gotFinalizer)
if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" {
t.Errorf("Pods with finalizers (-want,+got):\n%s", diff)
}
}
})
}
} }
} }
@ -4337,9 +4174,6 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
} }
job := newJob(2, 2, 6, batch.NonIndexedCompletion) job := newJob(2, 2, 6, batch.NonIndexedCompletion)
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
podInformer := sharedInformers.Core().V1().Pods().Informer() podInformer := sharedInformers.Core().V1().Pods().Informer()

View File

@ -687,9 +687,6 @@ func TestNonParallelJob(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1, Active: 1,
Ready: pointer.Int32(0), Ready: pointer.Int32(0),
@ -728,8 +725,7 @@ func TestNonParallelJob(t *testing.T) {
func TestParallelJob(t *testing.T) { func TestParallelJob(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
trackWithFinalizers bool enableReadyPods bool
enableReadyPods bool
}{ }{
"none": {}, "none": {},
"ready pods": { "ready pods": {
@ -820,9 +816,7 @@ func TestParallelJob(t *testing.T) {
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, want) validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
if tc.trackWithFinalizers { validateTerminatedPodsTrackingFinalizerMetric(t, 7)
validateTerminatedPodsTrackingFinalizerMetric(t, 7)
}
}) })
} }
} }
@ -911,9 +905,6 @@ func TestParallelJobWithCompletions(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
want := podsByStatus{Active: 54} want := podsByStatus{Active: 54}
if tc.enableReadyPods { if tc.enableReadyPods {
want.Ready = pointer.Int32Ptr(0) want.Ready = pointer.Int32Ptr(0)
@ -990,9 +981,6 @@ func TestIndexedJob(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3, Active: 3,
Ready: pointer.Int32(0), Ready: pointer.Int32(0),
@ -1042,7 +1030,6 @@ func TestIndexedJob(t *testing.T) {
// We expect that large jobs are more commonly used as Indexed. And they are // We expect that large jobs are more commonly used as Indexed. And they are
// also faster to track, as they need less API calls. // also faster to track, as they need less API calls.
func BenchmarkLargeIndexedJob(b *testing.B) { func BenchmarkLargeIndexedJob(b *testing.B) {
defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(b, "indexed") closeFn, restConfig, clientSet, ns := setup(b, "indexed")
restConfig.QPS = 100 restConfig.QPS = 100
restConfig.Burst = 100 restConfig.Burst = 100
@ -1144,9 +1131,6 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2, Active: 2,
Ready: pointer.Int32(0), Ready: pointer.Int32(0),
@ -1293,9 +1277,6 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1, Active: 1,
Ready: pointer.Int32(0), Ready: pointer.Int32(0),
@ -1882,14 +1863,6 @@ func hasJobTrackingFinalizer(obj metav1.Object) bool {
return false return false
} }
func hasJobTrackingAnnotation(job *batchv1.Job) bool {
if job.Annotations == nil {
return false
}
_, ok := job.Annotations[batchv1.JobTrackingFinalizer]
return ok
}
func setDuringTest(val *int, newVal int) func() { func setDuringTest(val *int, newVal int) func() {
origVal := *val origVal := *val
*val = newVal *val = newVal