From c856c412b97c44383e000081bb47955319f7980e Mon Sep 17 00:00:00 2001 From: Tomas Tormo Date: Tue, 27 Feb 2024 21:31:16 +0000 Subject: [PATCH] Add util pkg to commonize job util functions --- .../cronjob/cronjob_controllerv2.go | 15 +- pkg/controller/cronjob/utils.go | 25 --- pkg/controller/cronjob/utils_test.go | 53 ----- pkg/controller/job/job_controller.go | 9 +- pkg/controller/job/{ => util}/utils.go | 23 +- pkg/controller/job/util/utils_test.go | 209 ++++++++++++++++++ pkg/controller/job/utils_test.go | 82 ------- .../ttlafterfinished_controller.go | 2 +- test/e2e/apps/cronjob.go | 6 +- 9 files changed, 246 insertions(+), 178 deletions(-) rename pkg/controller/job/{ => util}/utils.go (62%) create mode 100644 pkg/controller/job/util/utils_test.go delete mode 100644 pkg/controller/job/utils_test.go diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index b83cf78836e..78812301e48 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -38,7 +38,7 @@ import ( batchv1informers "k8s.io/client-go/informers/batch/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - covev1client "k8s.io/client-go/kubernetes/typed/core/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" batchv1listers "k8s.io/client-go/listers/batch/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -47,6 +47,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/cronjob/metrics" + jobutil "k8s.io/kubernetes/pkg/controller/job/util" "k8s.io/utils/pointer" ) @@ -130,7 +131,7 @@ func (jm *ControllerV2) Run(ctx context.Context, workers int) { // Start event processing pipeline. jm.broadcaster.StartStructuredLogging(3) - jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")}) + jm.broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")}) defer jm.broadcaster.Shutdown() defer jm.queue.ShutDown() @@ -424,7 +425,7 @@ func (jm *ControllerV2) syncCronJob( for _, j := range jobs { childrenJobs[j.ObjectMeta.UID] = true found := inActiveList(cronJob, j.ObjectMeta.UID) - if !found && !IsJobFinished(j) { + if !found && !jobutil.IsJobFinished(j) { cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name) if err != nil { return nil, updateStatus, err @@ -438,12 +439,12 @@ func (jm *ControllerV2) syncCronJob( // This could happen if we crashed right after creating the Job and before updating the status, // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created // a job that they wanted us to adopt. - } else if found && IsJobFinished(j) { - _, status := getFinishedStatus(j) + } else if found && jobutil.IsJobFinished(j) { + _, condition := jobutil.FinishedCondition(j) deleteFromActiveList(cronJob, j.ObjectMeta.UID) - jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) + jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, condition: %v", j.Name, condition) updateStatus = true - } else if IsJobSucceeded(j) { + } else if jobutil.IsJobSucceeded(j) { // a job does not have to be in active list, as long as it has completed successfully, we will process the timestamp if cronJob.Status.LastSuccessfulTime == nil { cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index 681a27d18e3..8c215b0e333 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -277,31 +277,6 @@ func getTimeHashInMinutes(scheduledTime time.Time) int64 { return scheduledTime.Unix() / 60 } -func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) { - for _, c := range j.Status.Conditions { - if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { - return true, c.Type - } - } - return false, "" -} - -// IsJobFinished returns whether or not a job has completed successfully or failed. -func IsJobFinished(j *batchv1.Job) bool { - isFinished, _ := getFinishedStatus(j) - return isFinished -} - -// IsJobSucceeded returns whether a job has completed successfully. -func IsJobSucceeded(j *batchv1.Job) bool { - for _, c := range j.Status.Conditions { - if c.Type == batchv1.JobComplete && c.Status == corev1.ConditionTrue { - return true - } - } - return false -} - // byJobStartTime sorts a list of jobs by start timestamp, using their names as a tie breaker. type byJobStartTime []*batchv1.Job diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index 1791b585298..ac2162bee07 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -706,59 +706,6 @@ func TestNextScheduleTimeDuration(t *testing.T) { } } -func TestIsJobSucceeded(t *testing.T) { - tests := map[string]struct { - job batchv1.Job - wantResult bool - }{ - "job doesn't have any conditions": { - wantResult: false, - }, - "job has Complete=True condition": { - job: batchv1.Job{ - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{ - { - Type: batchv1.JobSuspended, - Status: v1.ConditionFalse, - }, - { - Type: batchv1.JobComplete, - Status: v1.ConditionTrue, - }, - }, - }, - }, - wantResult: true, - }, - "job has Complete=False condition": { - job: batchv1.Job{ - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{ - { - Type: batchv1.JobFailed, - Status: v1.ConditionTrue, - }, - { - Type: batchv1.JobComplete, - Status: v1.ConditionFalse, - }, - }, - }, - }, - wantResult: false, - }, - } - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - gotResult := IsJobSucceeded(&tc.job) - if tc.wantResult != gotResult { - t.Errorf("unexpected result, want=%v, got=%v", tc.wantResult, gotResult) - } - }) - } -} - func topOfTheHour() *time.Time { T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") if err != nil { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 82d463f3af6..f258056e7c3 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -50,6 +50,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/job/metrics" + "k8s.io/kubernetes/pkg/controller/job/util" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/clock" "k8s.io/utils/ptr" @@ -426,7 +427,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) return } job := jm.resolveControllerRef(pod.Namespace, controllerRef) - if job == nil || IsJobFinished(job) { + if job == nil || util.IsJobFinished(job) { // syncJob will not remove this finalizer. if hasFinalizer { jm.enqueueOrphanPod(pod) @@ -480,7 +481,7 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { // The job shouldn't be marked as finished until all pod finalizers are removed. // This is a backup operation in this case. - if IsJobFinished(curJob) { + if util.IsJobFinished(curJob) { jm.cleanupPodFinalizers(curJob) } @@ -655,7 +656,7 @@ func (jm *Controller) syncOrphanPod(ctx context.Context, key string) error { return nil } } - if job != nil && !IsJobFinished(job) { + if job != nil && !util.IsJobFinished(job) { // The pod was adopted. Do not remove finalizer. return nil } @@ -766,7 +767,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { job := *sharedJob.DeepCopy() // if job was finished previously, we don't want to redo the termination - if IsJobFinished(&job) { + if util.IsJobFinished(&job) { err := jm.podBackoffStore.removeBackoffRecord(key) if err != nil { // re-syncing here as the record has to be removed for finished/deleted jobs diff --git a/pkg/controller/job/utils.go b/pkg/controller/job/util/utils.go similarity index 62% rename from pkg/controller/job/utils.go rename to pkg/controller/job/util/utils.go index 571f8ebd4ab..8c6986586b0 100644 --- a/pkg/controller/job/utils.go +++ b/pkg/controller/job/util/utils.go @@ -14,18 +14,35 @@ See the License for the specific language governing permissions and limitations under the License. */ -package job +package util import ( batch "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) +// FinishedCondition returns true if a job is finished as well as the condition type indicating that. +// Returns false and no condition type otherwise +func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) { + for _, c := range j.Status.Conditions { + if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue { + return true, c.Type + } + } + return false, "" +} + // IsJobFinished checks whether the given Job has finished execution. // It does not discriminate between successful and failed terminations. func IsJobFinished(j *batch.Job) bool { + isFinished, _ := FinishedCondition(j) + return isFinished +} + +// IsJobSucceeded returns whether a job has completed successfully. +func IsJobSucceeded(j *batch.Job) bool { for _, c := range j.Status.Conditions { - if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue { + if c.Type == batch.JobComplete && c.Status == v1.ConditionTrue { return true } } diff --git a/pkg/controller/job/util/utils_test.go b/pkg/controller/job/util/utils_test.go new file mode 100644 index 00000000000..2e51b8531b2 --- /dev/null +++ b/pkg/controller/job/util/utils_test.go @@ -0,0 +1,209 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" +) + +func TestFinishedCondition(t *testing.T) { + tests := map[string]struct { + conditions []batch.JobCondition + wantJobFinished bool + wantConditionType batch.JobConditionType + }{ + "Job doesn't have any conditions": { + wantJobFinished: false, + wantConditionType: "", + }, + "Job is completed and condition is true": { + conditions: []batch.JobCondition{ + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }, + }, + wantJobFinished: true, + wantConditionType: batch.JobComplete, + }, + "Job is completed and condition is false": { + conditions: []batch.JobCondition{ + { + Type: batch.JobComplete, + Status: v1.ConditionFalse, + }, + }, + wantJobFinished: false, + wantConditionType: "", + }, + "Job is completed and condition is unknown": { + conditions: []batch.JobCondition{ + { + Type: batch.JobComplete, + Status: v1.ConditionUnknown, + }, + }, + wantJobFinished: false, + wantConditionType: "", + }, + "Job has multiple conditions, one of them being complete and condition true": { + conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: v1.ConditionFalse, + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }, + { + Type: batch.JobFailed, + Status: v1.ConditionFalse, + }, + }, + wantJobFinished: true, + wantConditionType: batch.JobComplete, + }, + "Job is failed and condition is true": { + conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + }, + }, + wantJobFinished: true, + wantConditionType: batch.JobFailed, + }, + "Job is failed and condition is false": { + conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionFalse, + }, + }, + wantJobFinished: false, + wantConditionType: "", + }, + "Job is failed and condition is unknown": { + conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionUnknown, + }, + }, + wantJobFinished: false, + wantConditionType: "", + }, + "Job has multiple conditions, none of them has condition true": { + conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: v1.ConditionFalse, + }, + { + Type: batch.JobComplete, + Status: v1.ConditionFalse, + }, + { + Type: batch.JobFailed, + Status: v1.ConditionFalse, + }, + }, + wantJobFinished: false, + wantConditionType: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + job := &batch.Job{ + Status: batch.JobStatus{ + Conditions: test.conditions, + }, + } + + isJobFinished, conditionType := FinishedCondition(job) + if isJobFinished != test.wantJobFinished { + if test.wantJobFinished { + t.Error("Expected the job to be finished") + } else { + t.Error("Expected the job to be unfinished") + } + } + + if conditionType != test.wantConditionType { + t.Errorf("Unexpected job condition type. got: '%v', want: '%v'", conditionType, test.wantConditionType) + } + }) + } +} + +func TestIsJobSucceeded(t *testing.T) { + tests := map[string]struct { + job batch.Job + wantResult bool + }{ + "job doesn't have any conditions": { + wantResult: false, + }, + "job has Complete=True condition": { + job: batch.Job{ + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuspended, + Status: v1.ConditionFalse, + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }, + }, + }, + }, + wantResult: true, + }, + "job has Complete=False condition": { + job: batch.Job{ + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + }, + { + Type: batch.JobComplete, + Status: v1.ConditionFalse, + }, + }, + }, + }, + wantResult: false, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + gotResult := IsJobSucceeded(&tc.job) + if tc.wantResult != gotResult { + t.Errorf("unexpected result, want=%v, got=%v", tc.wantResult, gotResult) + } + }) + } +} diff --git a/pkg/controller/job/utils_test.go b/pkg/controller/job/utils_test.go deleted file mode 100644 index 7d8099027d4..00000000000 --- a/pkg/controller/job/utils_test.go +++ /dev/null @@ -1,82 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package job - -import ( - "testing" - - batch "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" -) - -func TestIsJobFinished(t *testing.T) { - testCases := map[string]struct { - conditionType batch.JobConditionType - conditionStatus v1.ConditionStatus - expectJobNotFinished bool - }{ - "Job is completed and condition is true": { - batch.JobComplete, - v1.ConditionTrue, - false, - }, - "Job is completed and condition is false": { - batch.JobComplete, - v1.ConditionFalse, - true, - }, - "Job is completed and condition is unknown": { - batch.JobComplete, - v1.ConditionUnknown, - true, - }, - "Job is failed and condition is true": { - batch.JobFailed, - v1.ConditionTrue, - false, - }, - "Job is failed and condition is false": { - batch.JobFailed, - v1.ConditionFalse, - true, - }, - "Job is failed and condition is unknown": { - batch.JobFailed, - v1.ConditionUnknown, - true, - }, - } - - for name, tc := range testCases { - job := &batch.Job{ - Status: batch.JobStatus{ - Conditions: []batch.JobCondition{{ - Type: tc.conditionType, - Status: tc.conditionStatus, - }}, - }, - } - - if tc.expectJobNotFinished == IsJobFinished(job) { - if tc.expectJobNotFinished { - t.Errorf("test name: %s, job was not expected to be finished", name) - } else { - t.Errorf("test name: %s, job was expected to be finished", name) - } - } - } -} diff --git a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go index f4659a7bbe7..dd353bff5e8 100644 --- a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go +++ b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go @@ -37,7 +37,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubectl/pkg/scheme" "k8s.io/kubernetes/pkg/controller" - jobutil "k8s.io/kubernetes/pkg/controller/job" + jobutil "k8s.io/kubernetes/pkg/controller/job/util" "k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics" "k8s.io/utils/clock" ) diff --git a/test/e2e/apps/cronjob.go b/test/e2e/apps/cronjob.go index bf96c302d1a..7566f6aab60 100644 --- a/test/e2e/apps/cronjob.go +++ b/test/e2e/apps/cronjob.go @@ -39,7 +39,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/retry" batchinternal "k8s.io/kubernetes/pkg/apis/batch" - "k8s.io/kubernetes/pkg/controller/job" + jobutil "k8s.io/kubernetes/pkg/controller/job/util" "k8s.io/kubernetes/test/e2e/framework" e2ejob "k8s.io/kubernetes/test/e2e/framework/job" e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" @@ -715,7 +715,7 @@ func waitForAnyFinishedJob(ctx context.Context, c clientset.Interface, ns string return false, err } for i := range jobs.Items { - if job.IsJobFinished(&jobs.Items[i]) { + if jobutil.IsJobFinished(&jobs.Items[i]) { return true, nil } } @@ -761,7 +761,7 @@ func filterNotDeletedJobs(jobs *batchv1.JobList) []*batchv1.Job { func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) { for i := range jobs.Items { j := jobs.Items[i] - if !job.IsJobFinished(&j) { + if !jobutil.IsJobFinished(&j) { active = append(active, &j) } else { finished = append(finished, &j)