mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
Merge pull request #109486 from alculquicondor/job-backofflimit
Fix job tracking leaving pods with finalizers
This commit is contained in:
commit
63a618a815
@ -384,7 +384,8 @@ func (jm *Controller) deletePod(obj interface{}, final bool) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
|
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
|
||||||
if job == nil {
|
if job == nil || IsJobFinished(job) {
|
||||||
|
// syncJob will not remove this finalizer.
|
||||||
if hasFinalizer {
|
if hasFinalizer {
|
||||||
jm.enqueueOrphanPod(pod)
|
jm.enqueueOrphanPod(pod)
|
||||||
}
|
}
|
||||||
@ -585,7 +586,7 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
|
|||||||
// Make sure the pod is still orphaned.
|
// Make sure the pod is still orphaned.
|
||||||
if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil {
|
if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil {
|
||||||
job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef)
|
job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef)
|
||||||
if job != nil {
|
if job != nil && !IsJobFinished(job) {
|
||||||
// The pod was adopted. Do not remove finalizer.
|
// The pod was adopted. Do not remove finalizer.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -725,7 +726,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
||||||
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||||
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
|
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
|
||||||
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
|
satisfiedExpectations := jm.expectations.SatisfiedExpectations(key)
|
||||||
|
|
||||||
pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil)
|
pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -782,9 +783,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
if uncounted == nil {
|
if uncounted == nil {
|
||||||
// Legacy behavior: pretend all active pods were successfully removed.
|
// Legacy behavior: pretend all active pods were successfully removed.
|
||||||
deleted = active
|
deleted = active
|
||||||
} else if deleted != active {
|
} else if deleted != active || !satisfiedExpectations {
|
||||||
// Can't declare the Job as finished yet, as there might be remaining
|
// Can't declare the Job as finished yet, as there might be remaining
|
||||||
// pod finalizers.
|
// pod finalizers or pods that are not in the informer's cache yet.
|
||||||
finishedCondition = nil
|
finishedCondition = nil
|
||||||
}
|
}
|
||||||
active -= deleted
|
active -= deleted
|
||||||
@ -792,7 +793,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
manageJobErr = err
|
manageJobErr = err
|
||||||
} else {
|
} else {
|
||||||
manageJobCalled := false
|
manageJobCalled := false
|
||||||
if jobNeedsSync && job.DeletionTimestamp == nil {
|
if satisfiedExpectations && job.DeletionTimestamp == nil {
|
||||||
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes)
|
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes)
|
||||||
manageJobCalled = true
|
manageJobCalled = true
|
||||||
}
|
}
|
||||||
|
@ -196,6 +196,9 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
wasSuspended bool
|
wasSuspended bool
|
||||||
suspend bool
|
suspend bool
|
||||||
|
|
||||||
|
// If set, it means that the case is exclusive to tracking with/without finalizers.
|
||||||
|
wFinalizersExclusive *bool
|
||||||
|
|
||||||
// pod setup
|
// pod setup
|
||||||
podControllerError error
|
podControllerError error
|
||||||
jobKeyForget bool
|
jobKeyForget bool
|
||||||
@ -480,6 +483,16 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
expectedConditionReason: "BackoffLimitExceeded",
|
expectedConditionReason: "BackoffLimitExceeded",
|
||||||
expectedPodPatches: 1,
|
expectedPodPatches: 1,
|
||||||
},
|
},
|
||||||
|
"job failures, unsatisfied expectations": {
|
||||||
|
wFinalizersExclusive: pointer.Bool(true),
|
||||||
|
parallelism: 2,
|
||||||
|
completions: 5,
|
||||||
|
deleting: true,
|
||||||
|
failedPods: 1,
|
||||||
|
fakeExpectationAtCreation: 1,
|
||||||
|
expectedFailed: 1,
|
||||||
|
expectedPodPatches: 1,
|
||||||
|
},
|
||||||
"indexed job start": {
|
"indexed job start": {
|
||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
@ -706,6 +719,9 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
if wFinalizers && tc.podControllerError != nil {
|
if wFinalizers && tc.podControllerError != nil {
|
||||||
t.Skip("Can't track status if finalizers can't be removed")
|
t.Skip("Can't track status if finalizers can't be removed")
|
||||||
}
|
}
|
||||||
|
if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers {
|
||||||
|
t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive)
|
||||||
|
}
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||||
|
|
||||||
@ -2580,32 +2596,63 @@ func TestWatchOrphanPods(t *testing.T) {
|
|||||||
jobSynced = true
|
jobSynced = true
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create job but don't add it to the store.
|
|
||||||
testJob := newJob(2, 2, 6, batch.NonIndexedCompletion)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
go sharedInformers.Core().V1().Pods().Informer().Run(stopCh)
|
go sharedInformers.Core().V1().Pods().Informer().Run(stopCh)
|
||||||
go manager.Run(context.TODO(), 1)
|
go manager.Run(context.TODO(), 1)
|
||||||
|
|
||||||
orphanPod := buildPod().name("a").job(testJob).deletionTimestamp().trackingFinalizer().Pod
|
// Create job but don't add it to the store.
|
||||||
orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{})
|
cases := map[string]struct {
|
||||||
if err != nil {
|
job *batch.Job
|
||||||
t.Fatalf("Creating orphan pod: %v", err)
|
inCache bool
|
||||||
|
wantJobSynced bool
|
||||||
|
}{
|
||||||
|
"job_does_not_exist": {
|
||||||
|
job: newJob(2, 2, 6, batch.NonIndexedCompletion),
|
||||||
|
},
|
||||||
|
"orphan": {},
|
||||||
|
"job_finished": {
|
||||||
|
job: func() *batch.Job {
|
||||||
|
j := newJob(2, 2, 6, batch.NonIndexedCompletion)
|
||||||
|
j.Status.Conditions = append(j.Status.Conditions, batch.JobCondition{
|
||||||
|
Type: batch.JobComplete,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
})
|
||||||
|
return j
|
||||||
|
}(),
|
||||||
|
inCache: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
for name, tc := range cases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
jobSynced = false
|
||||||
|
if tc.inCache {
|
||||||
|
sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job)
|
||||||
|
}
|
||||||
|
|
||||||
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer()
|
||||||
p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{})
|
if tc.job != nil {
|
||||||
if err != nil {
|
podBuilder = podBuilder.job(tc.job)
|
||||||
return false, err
|
}
|
||||||
}
|
orphanPod := podBuilder.Pod
|
||||||
return !hasJobTrackingFinalizer(p), nil
|
orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{})
|
||||||
}); err != nil {
|
if err != nil {
|
||||||
t.Errorf("Waiting for Pod to get the finalizer removed: %v", err)
|
t.Fatalf("Creating orphan pod: %v", err)
|
||||||
}
|
}
|
||||||
if jobSynced {
|
|
||||||
t.Error("Tried to sync deleted job")
|
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
|
p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return !hasJobTrackingFinalizer(p), nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Errorf("Waiting for Pod to get the finalizer removed: %v", err)
|
||||||
|
}
|
||||||
|
if !tc.inCache && jobSynced {
|
||||||
|
t.Error("Tried to sync deleted job")
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -535,7 +535,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
|||||||
|
|
||||||
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||||
for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground} {
|
for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
|
||||||
t.Run(string(policy), func(t *testing.T) {
|
t.Run(string(policy), func(t *testing.T) {
|
||||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -575,29 +575,76 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to delete job: %v", err)
|
t.Fatalf("Failed to delete job: %v", err)
|
||||||
}
|
}
|
||||||
orphanPods := 0
|
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
|
||||||
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
|
|
||||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{
|
|
||||||
LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
orphanPods = 0
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
if hasJobTrackingFinalizer(&pod) {
|
|
||||||
orphanPods++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return orphanPods == 0, nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err)
|
|
||||||
t.Logf("Last saw %d orphan pods", orphanPods)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||||
|
|
||||||
|
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||||
|
defer closeFn()
|
||||||
|
ctx, cancel := startJobController(restConfig)
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Job tracking with finalizers requires less calls in Indexed mode,
|
||||||
|
// so it's more likely to process all finalizers before all the pods
|
||||||
|
// are visible.
|
||||||
|
mode := batchv1.IndexedCompletion
|
||||||
|
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||||
|
Spec: batchv1.JobSpec{
|
||||||
|
CompletionMode: &mode,
|
||||||
|
Completions: pointer.Int32(500),
|
||||||
|
Parallelism: pointer.Int32(500),
|
||||||
|
BackoffLimit: pointer.Int32(0),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not create job: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fail a pod ASAP.
|
||||||
|
err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||||
|
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not fail pod: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
validateJobFailed(ctx, t, clientSet, jobObj)
|
||||||
|
|
||||||
|
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
||||||
|
t.Helper()
|
||||||
|
orphanPods := 0
|
||||||
|
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||||
|
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{
|
||||||
|
LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
orphanPods = 0
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if hasJobTrackingFinalizer(&pod) {
|
||||||
|
orphanPods++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return orphanPods == 0, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err)
|
||||||
|
t.Logf("Last saw %d orphan pods", orphanPods)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
|
func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
|
||||||
// Step 0: job created while feature is enabled.
|
// Step 0: job created while feature is enabled.
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||||
@ -974,16 +1021,26 @@ func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1.
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateJobFailed(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
||||||
|
t.Helper()
|
||||||
|
validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
|
||||||
|
}
|
||||||
|
|
||||||
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
||||||
|
t.Helper()
|
||||||
|
validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete)
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to obtain updated Job: %v", err)
|
t.Fatalf("Failed to obtain updated Job: %v", err)
|
||||||
}
|
}
|
||||||
return getJobConditionStatus(ctx, j, batchv1.JobComplete) == v1.ConditionTrue, nil
|
return getJobConditionStatus(ctx, j, cond) == v1.ConditionTrue, nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Errorf("Waiting for Job to succeed: %v", err)
|
t.Errorf("Waiting for Job to have condition %s: %v", cond, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user