mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
Clean backoff record earlier
Once received job deletion event, it cleans the backoff records for that job before enqueueing this job so that we can avoid a race condition that the syncJob() may incorrect use stale backoff records for a newly created job with same key. Co-authored-by: Michal Wozniak <michalwozniak@google.com>
This commit is contained in:
parent
f20adaecd4
commit
d2be12ab76
@ -537,6 +537,12 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
|
||||
}
|
||||
}
|
||||
jm.enqueueLabelSelector(jobObj)
|
||||
|
||||
key := cache.MetaObjectToName(jobObj).String()
|
||||
err := jm.podBackoffStore.removeBackoffRecord(key)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("error removing backoff record %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
func (jm *Controller) enqueueLabelSelector(jobObj *batch.Job) {
|
||||
|
@ -2282,6 +2282,103 @@ func TestManagedBy_Reenabling(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestImmediateJobRecreation verifies that the replacement Job creates the Pods
|
||||
// quickly after re-creation, see https://github.com/kubernetes/kubernetes/issues/132042.
|
||||
func TestImmediateJobRecreation(t *testing.T) {
|
||||
// set the backoff delay very high to make sure the test does not pass waiting long on asserts
|
||||
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*wait.ForeverTestTimeout))
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "recreate-job-immediately")
|
||||
t.Cleanup(closeFn)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
baseJob := batchv1.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ns.Name,
|
||||
},
|
||||
Spec: batchv1.JobSpec{
|
||||
Completions: ptr.To[int32](1),
|
||||
Parallelism: ptr.To[int32](1),
|
||||
Template: v1.PodTemplateSpec{
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "main-container",
|
||||
Image: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
jobSpec := func(idx int) batchv1.Job {
|
||||
spec := baseJob.DeepCopy()
|
||||
spec.Name = fmt.Sprintf("test-job-%d", idx)
|
||||
return *spec
|
||||
}
|
||||
|
||||
var jobObjs []*batchv1.Job
|
||||
// We create multiple Jobs to make the repro more likely. In particular, we need
|
||||
// more Jobs than the number of Job controller workers to make it very unlikely
|
||||
// that syncJob executes (and cleans the in-memory state) before the corresponding
|
||||
// replacement Jobs are created.
|
||||
for i := 0; i < 3; i++ {
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, ptr.To(jobSpec(i)))
|
||||
if err != nil {
|
||||
t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
|
||||
}
|
||||
jobObjs = append(jobObjs, jobObj)
|
||||
}
|
||||
|
||||
for _, jobObj := range jobObjs {
|
||||
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Ready: ptr.To[int32](0),
|
||||
Terminating: ptr.To[int32](0),
|
||||
})
|
||||
|
||||
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodFailed, klog.KObj(jobObj))
|
||||
}
|
||||
|
||||
// Await to account for the failed Pod
|
||||
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 1,
|
||||
Ready: ptr.To[int32](0),
|
||||
Terminating: ptr.To[int32](0),
|
||||
})
|
||||
}
|
||||
|
||||
for i := 0; i < len(jobObjs); i++ {
|
||||
jobObj := jobObjs[i]
|
||||
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
|
||||
if err := jobClient.Delete(ctx, jobObj.Name, metav1.DeleteOptions{
|
||||
// Use propagationPolicy=background so that we don't need to wait for the job object to be gone.
|
||||
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
|
||||
}); err != nil {
|
||||
t.Fatalf("Error %v when deleting the job %v", err, klog.KObj(jobObj))
|
||||
}
|
||||
|
||||
// re-create the job immediately
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, ptr.To(jobSpec(i)))
|
||||
if err != nil {
|
||||
t.Fatalf("Error %q while creating the job %q", err, klog.KObj(jobObj))
|
||||
}
|
||||
jobObjs[i] = jobObj
|
||||
}
|
||||
|
||||
// total timeout (3*5s) is less than 2*ForeverTestTimeout.
|
||||
for _, jobObj := range jobObjs {
|
||||
// wait maks 5s for the Active=1. This assert verifies that the backoff
|
||||
// delay is not applied to the replacement instance of the Job.
|
||||
validateJobsPodsStatusOnlyWithTimeout(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Ready: ptr.To[int32](0),
|
||||
Terminating: ptr.To[int32](0),
|
||||
}, 5*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// TestManagedBy_RecreatedJob verifies that the Job controller skips
|
||||
// reconciliation of a job with managedBy field, when this is a recreated job,
|
||||
// and there is still a pending sync queued for the previous job.
|
||||
|
Loading…
Reference in New Issue
Block a user