From eea7dca085af3b32199b2d4210d229c2dd624791 Mon Sep 17 00:00:00 2001 From: Harsha Narayana Date: Tue, 31 May 2022 00:39:40 +0530 Subject: [PATCH] GIT-110239: fix activeDeadlineSeconds enforcement bug GIT-110239: add additional tests with preset Status.StartTime GIT-110239: add additional tests with preset Status.StartTime --- pkg/controller/job/job_controller.go | 14 ++-- pkg/controller/job/job_controller_test.go | 92 +++++++++++++++-------- 2 files changed, 67 insertions(+), 39 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 863fa7e4cd0..d746ff7c16c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -746,17 +746,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { ready = pointer.Int32(countReadyPods(activePods)) } - // Job first start. Set StartTime and start the ActiveDeadlineSeconds timer - // only if the job is not in the suspended state. + + // Job first start. Set StartTime only if the job is not in the suspended state. if job.Status.StartTime == nil && !jobSuspended(&job) { now := metav1.Now() job.Status.StartTime = &now - // enqueue a sync to check if job past ActiveDeadlineSeconds - if job.Spec.ActiveDeadlineSeconds != nil { - klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds", - key, *job.Spec.ActiveDeadlineSeconds) - jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) - } } var manageJobErr error @@ -775,6 +769,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit") } else if pastActiveDeadline(&job) { finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline") + } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) { + syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(job.Status.StartTime.Time) + klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration) + jm.queue.AddAfter(key, syncDuration) } var prevSucceededIndexes, succeededIndexes orderedIntervals diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 836cb780282..1639f148125 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -59,11 +59,11 @@ import ( var alwaysReady = func() bool { return true } -func newJob(parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { +func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { j := &batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: metav1.ObjectMeta{ - Name: "foobar", + Name: name, UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, }, @@ -105,6 +105,10 @@ func newJob(parallelism, completions, backoffLimit int32, completionMode batch.C return j } +func newJob(parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { + return newJobWithName("foobar", parallelism, completions, backoffLimit, completionMode) +} + func newControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) { sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) jm := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient) @@ -1828,40 +1832,66 @@ func hasTrueCondition(job *batch.Job) *batch.JobConditionType { } func TestSyncPastDeadlineJobFinished(t *testing.T) { - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + clientset := fake.NewSimpleClientset() manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - var actual *batch.Job - manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { - actual = job - return job, nil - } - job := newJob(1, 1, 6, batch.NonIndexedCompletion) - activeDeadlineSeconds := int64(10) - job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds - start := metav1.Unix(metav1.Now().Time.Unix()-15, 0) - job.Status.StartTime = &start - job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")) - sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) - if err != nil { - t.Errorf("Unexpected error when syncing jobs %v", err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sharedInformerFactory.Start(ctx.Done()) + + go manager.Run(ctx, 1) + + tests := []struct { + name string + setStartTime bool + jobName string + }{ + { + name: "New job created without start time being set", + setStartTime: false, + jobName: "job1", + }, + { + name: "New job created with start time being set", + setStartTime: true, + jobName: "job2", + }, } - if !forget { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) - } - if len(fakePodControl.Templates) != 0 { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) - } - if len(fakePodControl.DeletePodName) != 0 { - t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) - } - if actual != nil { - t.Error("Unexpected job modification") + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + job := newJobWithName(tc.jobName, 1, 1, 6, batch.NonIndexedCompletion) + activeDeadlineSeconds := int64(1) + job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds + if tc.setStartTime { + start := metav1.Unix(metav1.Now().Time.Unix()-1, 0) + job.Status.StartTime = &start + } + + _, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Could not create Job: %v", err) + } + + if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil { + t.Fatalf("Failed to insert job in index: %v", err) + } + var j *batch.Job + err = wait.Poll(200*time.Millisecond, 3*time.Second, func() (done bool, err error) { + j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) + if err != nil { + return false, nil + } + if len(j.Status.Conditions) == 1 && j.Status.Conditions[0].Reason == "DeadlineExceeded" { + return true, nil + } + return false, nil + }) + if err != nil { + t.Errorf("Job failed to enforce activeDeadlineSeconds configuration. Expected condition with Reason 'DeadlineExceeded' was not found in %v", j.Status) + } + }) } }