Merge pull request #114647 from kannon92/remove-legacy-job-tracking-job-controller

Removing Legacy Job Tracking Code
This commit is contained in:
Kubernetes Prow Robot 2023-01-12 04:38:53 -08:00 committed by GitHub
commit 457341c3d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 284 additions and 592 deletions

View File

@ -52,17 +52,13 @@ type orderedIntervals []interval
// empty list if this Job is not tracked with finalizers. The new list includes
// the indexes that succeeded since the last sync.
func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
var prevIntervals orderedIntervals
withFinalizers := hasJobTrackingAnnotation(job)
if withFinalizers {
prevIntervals = succeededIndexesFromJob(job)
}
prevIntervals := succeededIndexesFromJob(job)
newSucceeded := sets.NewInt()
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
// Succeeded Pod with valid index and, if tracking with finalizers,
// has a finalizer (meaning that it is not counted yet).
if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && (!withFinalizers || hasJobTrackingFinalizer(p)) {
if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) {
newSucceeded.Insert(ix)
}
}

View File

@ -32,7 +32,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
prevSucceeded string
pods []indexPhase
completions int32
trackingWithFinalizers bool
wantStatusIntervals orderedIntervals
wantIntervals orderedIntervals
}{
@ -65,19 +64,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
completions: 8,
wantIntervals: []interval{{2, 3}, {5, 7}},
},
"one interval, ignore previous": {
prevSucceeded: "3-5",
pods: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"1", v1.PodSucceeded},
{"2", v1.PodSucceeded},
{"2", v1.PodSucceeded},
{"3", v1.PodFailed},
},
completions: 4,
wantIntervals: []interval{{0, 2}},
},
"one index and one interval": {
pods: []indexPhase{
{"0", v1.PodSucceeded},
@ -109,14 +95,12 @@ func TestCalculateSucceededIndexes(t *testing.T) {
"prev interval out of range": {
prevSucceeded: "0-5,8-10",
completions: 8,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{{0, 5}},
wantIntervals: []interval{{0, 5}},
},
"prev interval partially out of range": {
prevSucceeded: "0-5,8-10",
completions: 10,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{{0, 5}, {8, 9}},
wantIntervals: []interval{{0, 5}, {8, 9}},
},
@ -128,7 +112,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"8", v1.PodSucceeded},
},
completions: 13,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{0, 0},
{4, 5},
@ -150,7 +133,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"8", v1.PodSucceeded},
},
completions: 9,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{3, 4},
{6, 6},
@ -168,7 +150,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"6", v1.PodSucceeded},
},
completions: 9,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{2, 2},
{7, 8},
@ -187,7 +168,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"9", v1.PodSucceeded},
},
completions: 10,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{2, 7},
},
@ -203,7 +183,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"3", v1.PodSucceeded},
},
completions: 4,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{0, 0},
},
@ -223,11 +202,6 @@ func TestCalculateSucceededIndexes(t *testing.T) {
Completions: pointer.Int32(tc.completions),
},
}
if tc.trackingWithFinalizers {
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
}
pods := hollowPodsWithIndexPhase(tc.pods)
for _, p := range pods {
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)

View File

@ -614,9 +614,9 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
// getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning, adding tracking
// finalizers, if enabled.
// finalizers.
// Note that the returned Pods are pointers into the cache.
func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) {
func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
@ -639,14 +639,10 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinal
}
return fresh, nil
})
var finalizers []string
if withFinalizers {
finalizers = append(finalizers, batch.JobTrackingFinalizer)
}
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...)
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, batch.JobTrackingFinalizer)
// When adopting Pods, this operation adds an ownerRef and finalizers.
pods, err = cm.ClaimPods(ctx, pods)
if err != nil || !withFinalizers {
if err != nil {
return pods, err
}
// Set finalizer on adopted pods for the remaining calculations.
@ -718,23 +714,18 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}()
var expectedRmFinalizers sets.String
var uncounted *uncountedTerminatedPods
if hasJobTrackingAnnotation(&job) {
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
}
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key)
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
satisfiedExpectations := jm.expectations.SatisfiedExpectations(key)
pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil)
pods, err := jm.getPodsForJob(ctx, &job)
if err != nil {
return false, err
}
@ -767,13 +758,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
} else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil {
if uncounted != nil {
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
} else {
// Prepare the Failed job condition for the legacy path without finalizers (don't use the interim FailureTarget condition).
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
}
}
}
if finishedCondition == nil {
@ -799,16 +785,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
// Remove active pods if Job failed.
if finishedCondition != nil {
deleted, err := jm.deleteActivePods(ctx, &job, activePods)
if uncounted == nil {
// Legacy behavior: pretend all active pods were successfully removed.
deleted = active
} else if deleted != active || !satisfiedExpectations {
if deleted != active || !satisfiedExpectations {
// Can't declare the Job as finished yet, as there might be remaining
// pod finalizers or pods that are not in the informer's cache yet.
finishedCondition = nil
}
active -= deleted
failed += deleted
manageJobErr = err
} else {
manageJobCalled := false
@ -872,7 +854,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
// In this case, we should clear the backoff delay.
forget = job.Status.Succeeded < succeeded
if uncounted != nil {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
job.Status.Active = active
job.Status.Ready = ready
@ -886,42 +867,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
return forget, manageJobErr
}
// Legacy path: tracking without finalizers.
// Ensure that there are no leftover tracking finalizers.
if err := jm.removeTrackingFinalizersFromAllPods(ctx, pods); err != nil {
return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
}
// no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || !equalReady(job.Status.Ready, ready) || suspendCondChanged || finishedCondition != nil {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
job.Status.Ready = ready
if isIndexedJob(&job) {
job.Status.CompletedIndexes = succeededIndexes.String()
}
job.Status.UncountedTerminatedPods = nil
jobFinished := jm.enactJobFinished(&job, finishedCondition)
if _, err := jm.updateStatusHandler(ctx, &job); err != nil {
return forget, err
}
if jobFinished {
jm.recordJobFinished(&job, finishedCondition)
}
if jobHasNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
}
return forget, manageJobErr
}
@ -986,23 +931,6 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
return successfulDeletes, errorFromChannel(errCh)
}
// removeTrackingFinalizersFromAllPods removes finalizers from any Job Pod. This is called
// when Job tracking with finalizers is disabled.
func (jm *Controller) removeTrackingFinalizersFromAllPods(ctx context.Context, pods []*v1.Pod) error {
var podsWithFinalizer []*v1.Pod
for _, pod := range pods {
if hasJobTrackingFinalizer(pod) {
podsWithFinalizer = append(podsWithFinalizer, pod)
}
}
if len(podsWithFinalizer) == 0 {
return nil
}
// Tracking with finalizers is disabled, no need to set expectations.
_, err := jm.removeTrackingFinalizerFromPods(ctx, "", podsWithFinalizer)
return err
}
// trackJobStatusAndRemoveFinalizers does:
// 1. Add finished Pods to .status.uncountedTerminatedPods
// 2. Remove the finalizers from the Pods if they completed or were removed
@ -1053,7 +981,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// considerTerminated || job.DeletionTimestamp != nil
considerTerminated = podutil.IsPodTerminal(pod) ||
finishedCond != nil || // The Job is terminating. Any running Pod is considered failed.
isPodFailed(pod, job, true /* using finalizers */)
isPodFailed(pod, job)
}
if podutil.IsPodTerminal(pod) || considerTerminated || job.DeletionTimestamp != nil {
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
@ -1361,7 +1289,7 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
return nil
}
for _, p := range pods {
if isPodFailed(p, job, uncounted != nil) {
if isPodFailed(p, job) {
jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
if jobFailureMessage != nil {
return jobFailureMessage
@ -1374,22 +1302,20 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
// getStatus returns number of succeeded and failed pods running a job. The number
// of failed pods can be affected by the podFailurePolicy.
func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) {
if uncounted != nil {
succeeded = job.Status.Succeeded
failed = job.Status.Failed
}
succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
return p.Status.Phase == v1.PodSucceeded
}))
failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
if !isPodFailed(p, job, uncounted != nil) {
if !isPodFailed(p, job) {
return false
}
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
return countFailed
} else {
return isPodFailed(p, job, uncounted != nil)
return isPodFailed(p, job)
}
}))
return succeeded, failed
@ -1487,9 +1413,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
if isIndexedJob(job) {
addCompletionIndexEnvVariables(podTemplate)
}
if hasJobTrackingAnnotation(job) {
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
}
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
@ -1646,14 +1570,6 @@ func getCompletionMode(job *batch.Job) string {
return string(batch.NonIndexedCompletion)
}
func hasJobTrackingAnnotation(job *batch.Job) bool {
if job.Annotations == nil {
return false
}
_, ok := job.Annotations[batch.JobTrackingFinalizer]
return ok
}
func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string {
for _, fin := range finalizers {
if fin == batch.JobTrackingFinalizer {
@ -1739,7 +1655,7 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
return list, false
}
func isPodFailed(p *v1.Pod, job *batch.Job, wFinalizers bool) bool {
func isPodFailed(p *v1.Pod, job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
// When PodDisruptionConditions is enabled, orphan Pods and unschedulable
// terminating Pods are marked as Failed. So we only need to check the phase.
@ -1751,10 +1667,9 @@ func isPodFailed(p *v1.Pod, job *batch.Job, wFinalizers bool) bool {
if p.Status.Phase == v1.PodFailed {
return true
}
// When tracking with finalizers: counting deleted Pods as failures to
// account for orphan Pods that never have a chance to reach the Failed
// phase.
return wFinalizers && p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
// Count deleted Pods as failures to account for orphan Pods that
// never have a chance to reach the Failed phase.
return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
}
func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition {

View File

@ -141,9 +141,7 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
for i := 0; i < count; i++ {
newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
newPod.Status = v1.PodStatus{Phase: status}
if hasJobTrackingAnnotation(job) {
newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
}
pods = append(pods, newPod)
}
return pods
@ -184,9 +182,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
}
p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index)
}
if hasJobTrackingAnnotation(job) {
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
}
podIndexer.Add(p)
}
}
@ -207,9 +203,6 @@ func TestControllerSyncJob(t *testing.T) {
wasSuspended bool
suspend bool
// If set, it means that the case is exclusive to tracking with/without finalizers.
wFinalizersExclusive *bool
// pod setup
podControllerError error
jobKeyForget bool
@ -495,7 +488,6 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches: 1,
},
"job failures, unsatisfied expectations": {
wFinalizersExclusive: pointer.Bool(true),
parallelism: 2,
completions: 5,
deleting: true,
@ -725,14 +717,10 @@ func TestControllerSyncJob(t *testing.T) {
}
for name, tc := range testCases {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("%s, finalizers=%t", name, wFinalizers), func(t *testing.T) {
if wFinalizers && tc.podControllerError != nil {
t.Run(name, func(t *testing.T) {
if tc.podControllerError != nil {
t.Skip("Can't track status if finalizers can't be removed")
}
if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers {
t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive)
}
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
// job manager setup
@ -758,11 +746,6 @@ func TestControllerSyncJob(t *testing.T) {
if tc.wasSuspended {
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now()))
}
if wFinalizers {
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
}
if tc.deleting {
now := metav1.Now()
job.DeletionTimestamp = &now
@ -885,16 +868,11 @@ func TestControllerSyncJob(t *testing.T) {
if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
}
wantPodPatches := 0
if wFinalizers {
wantPodPatches = tc.expectedPodPatches
}
if p := len(fakePodControl.Patches); p != wantPodPatches {
t.Errorf("Got %d pod patches, want %d", p, wantPodPatches)
if p := len(fakePodControl.Patches); p != tc.expectedPodPatches {
t.Errorf("Got %d pod patches, want %d", p, tc.expectedPodPatches)
}
})
}
}
}
func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int, jobName string) {
@ -922,94 +900,6 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI
}
}
// TestSyncJobLegacyTracking makes sure that a Job is only tracked with
// finalizers when the job has the annotation.
func TestSyncJobLegacyTracking(t *testing.T) {
cases := map[string]struct {
job batch.Job
wantUncounted bool
wantPatches int
}{
"no annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32(1),
},
},
},
"tracking annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
Annotations: map[string]string{
batch.JobTrackingFinalizer: "",
},
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32(1),
},
},
wantUncounted: true,
},
"different annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
Annotations: map[string]string{
"foo": "bar",
},
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32(1),
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
// Job manager setup.
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
jobPatches := 0
manager.patchJobHandler = func(context.Context, *batch.Job, []byte) error {
jobPatches++
return nil
}
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(&tc.job)
var actual *batch.Job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
return job, nil
}
// Run.
_, err := manager.syncJob(context.TODO(), testutil.GetKey(&tc.job, t))
if err != nil {
t.Fatalf("Syncing job: %v", err)
}
// Checks.
if got := actual.Status.UncountedTerminatedPods != nil; got != tc.wantUncounted {
t.Errorf("Job got uncounted pods %t, want %t", got, tc.wantUncounted)
}
if jobPatches != tc.wantPatches {
t.Errorf("Sync did %d patches, want %d", jobPatches, tc.wantPatches)
}
})
}
}
func TestGetStatus(t *testing.T) {
cases := map[string]struct {
job batch.Job
@ -1018,24 +908,6 @@ func TestGetStatus(t *testing.T) {
wantSucceeded int32
wantFailed int32
}{
"without finalizers": {
job: batch.Job{
Status: batch.JobStatus{
Succeeded: 1,
Failed: 2,
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("b").phase(v1.PodSucceeded).Pod,
buildPod().uid("c").phase(v1.PodFailed).Pod,
buildPod().uid("d").phase(v1.PodFailed).Pod,
buildPod().uid("e").phase(v1.PodFailed).Pod,
buildPod().uid("f").phase(v1.PodRunning).Pod,
},
wantSucceeded: 2,
wantFailed: 3,
},
"some counted": {
job: batch.Job{
Status: batch.JobStatus{
@ -1103,16 +975,6 @@ func TestGetStatus(t *testing.T) {
wantFailed: 5,
},
"deleted pods": {
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod,
buildPod().uid("b").phase(v1.PodFailed).deletionTimestamp().Pod,
buildPod().uid("c").phase(v1.PodRunning).deletionTimestamp().Pod,
buildPod().uid("d").phase(v1.PodPending).deletionTimestamp().Pod,
},
wantSucceeded: 1,
wantFailed: 1,
},
"deleted pods, tracking with finalizers": {
job: batch.Job{
Status: batch.JobStatus{
Succeeded: 1,
@ -1134,10 +996,7 @@ func TestGetStatus(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
var uncounted *uncountedTerminatedPods
if tc.job.Status.UncountedTerminatedPods != nil {
uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
}
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
if succeeded != tc.wantSucceeded {
t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded)
@ -1869,9 +1728,6 @@ func TestPastDeadlineJobFinished(t *testing.T) {
start := metav1.NewTime(fakeClock.Now())
job.Status.StartTime = &start
}
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
_, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
@ -2063,7 +1919,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
}
testCases := map[string]struct {
wFinalizersExclusive *bool
enableJobPodFailurePolicy bool
enablePodDisruptionConditions bool
job batch.Job
@ -2949,7 +2804,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
wantStatusSucceeded: 0,
},
"terminating Pod considered failed when PodDisruptionConditions is disabled": {
wFinalizersExclusive: pointer.Bool(true),
enableJobPodFailurePolicy: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
@ -3030,12 +2884,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
wantStatusActive: 1, // This is a replacement Pod: the terminating Pod is neither active nor failed.
},
}
for _, wFinalizers := range []bool{false, true} {
for name, tc := range testCases {
t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) {
if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers {
t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive)
}
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@ -3046,12 +2896,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
manager.jobStoreSynced = alwaysReady
job := &tc.job
if wFinalizers {
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
}
actual := job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
@ -3064,9 +2908,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion {
pb.index(fmt.Sprintf("%v", i))
}
if wFinalizers {
pb.trackingFinalizer()
}
pb = pb.trackingFinalizer()
sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod)
}
@ -3100,7 +2942,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
}
})
}
}
}
func TestSyncJobUpdateRequeue(t *testing.T) {
@ -3291,8 +3132,7 @@ func TestGetPodsForJob(t *testing.T) {
},
}
for name, tc := range cases {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("%s, finalizers=%t", name, wFinalizers), func(t *testing.T) {
t.Run(name, func(t *testing.T) {
job := job.DeepCopy()
if tc.jobDeleted {
job.DeletionTimestamp = &metav1.Time{}
@ -3311,7 +3151,7 @@ func TestGetPodsForJob(t *testing.T) {
informer.Core().V1().Pods().Informer().GetIndexer().Add(p)
}
pods, err := jm.getPodsForJob(context.TODO(), job, wFinalizers)
pods, err := jm.getPodsForJob(context.TODO(), job)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
@ -3327,15 +3167,12 @@ func TestGetPodsForJob(t *testing.T) {
if diff := cmp.Diff(tc.wantPods, got); diff != "" {
t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff)
}
if wFinalizers {
sort.Strings(gotFinalizer)
if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" {
t.Errorf("Pods with finalizers (-want,+got):\n%s", diff)
}
}
})
}
}
}
func TestAddPod(t *testing.T) {
@ -4337,9 +4174,6 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
}
job := newJob(2, 2, 6, batch.NonIndexedCompletion)
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
podInformer := sharedInformers.Core().V1().Pods().Informer()

View File

@ -687,9 +687,6 @@ func TestNonParallelJob(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
@ -728,7 +725,6 @@ func TestNonParallelJob(t *testing.T) {
func TestParallelJob(t *testing.T) {
cases := map[string]struct {
trackWithFinalizers bool
enableReadyPods bool
}{
"none": {},
@ -820,9 +816,7 @@ func TestParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
if tc.trackWithFinalizers {
validateTerminatedPodsTrackingFinalizerMetric(t, 7)
}
})
}
}
@ -911,9 +905,6 @@ func TestParallelJobWithCompletions(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
want := podsByStatus{Active: 54}
if tc.enableReadyPods {
want.Ready = pointer.Int32Ptr(0)
@ -990,9 +981,6 @@ func TestIndexedJob(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Ready: pointer.Int32(0),
@ -1042,7 +1030,6 @@ func TestIndexedJob(t *testing.T) {
// We expect that large jobs are more commonly used as Indexed. And they are
// also faster to track, as they need less API calls.
func BenchmarkLargeIndexedJob(b *testing.B) {
defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(b, "indexed")
restConfig.QPS = 100
restConfig.Burst = 100
@ -1144,9 +1131,6 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
@ -1293,9 +1277,6 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
@ -1882,14 +1863,6 @@ func hasJobTrackingFinalizer(obj metav1.Object) bool {
return false
}
func hasJobTrackingAnnotation(job *batchv1.Job) bool {
if job.Annotations == nil {
return false
}
_, ok := job.Annotations[batchv1.JobTrackingFinalizer]
return ok
}
func setDuringTest(val *int, newVal int) func() {
origVal := *val
*val = newVal