Merge pull request #123537 from kaisoz/commonize-job-util-functions

Add the util pkg to commonize job util functions
This commit is contained in:
Kubernetes Prow Robot 2024-05-07 16:59:28 -07:00 committed by GitHub
commit d2e6c51b05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 246 additions and 178 deletions

View File

@ -38,7 +38,7 @@ import (
batchv1informers "k8s.io/client-go/informers/batch/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
covev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
batchv1listers "k8s.io/client-go/listers/batch/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@ -47,6 +47,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/cronjob/metrics"
jobutil "k8s.io/kubernetes/pkg/controller/job/util"
"k8s.io/utils/pointer"
)
@ -135,7 +136,7 @@ func (jm *ControllerV2) Run(ctx context.Context, workers int) {
// Start event processing pipeline.
jm.broadcaster.StartStructuredLogging(3)
jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
jm.broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
defer jm.broadcaster.Shutdown()
defer jm.queue.ShutDown()
@ -429,7 +430,7 @@ func (jm *ControllerV2) syncCronJob(
for _, j := range jobs {
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(cronJob, j.ObjectMeta.UID)
if !found && !IsJobFinished(j) {
if !found && !jobutil.IsJobFinished(j) {
cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
if err != nil {
return nil, updateStatus, err
@ -443,12 +444,12 @@ func (jm *ControllerV2) syncCronJob(
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.
} else if found && IsJobFinished(j) {
_, status := getFinishedStatus(j)
} else if found && jobutil.IsJobFinished(j) {
_, condition := jobutil.FinishedCondition(j)
deleteFromActiveList(cronJob, j.ObjectMeta.UID)
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, condition: %v", j.Name, condition)
updateStatus = true
} else if IsJobSucceeded(j) {
} else if jobutil.IsJobSucceeded(j) {
// a job does not have to be in active list, as long as it has completed successfully, we will process the timestamp
if cronJob.Status.LastSuccessfulTime == nil {
cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime

View File

@ -277,31 +277,6 @@ func getTimeHashInMinutes(scheduledTime time.Time) int64 {
return scheduledTime.Unix() / 60
}
func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
// IsJobFinished returns whether or not a job has completed successfully or failed.
func IsJobFinished(j *batchv1.Job) bool {
isFinished, _ := getFinishedStatus(j)
return isFinished
}
// IsJobSucceeded returns whether a job has completed successfully.
func IsJobSucceeded(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if c.Type == batchv1.JobComplete && c.Status == corev1.ConditionTrue {
return true
}
}
return false
}
// byJobStartTime sorts a list of jobs by start timestamp, using their names as a tie breaker.
type byJobStartTime []*batchv1.Job

View File

@ -706,59 +706,6 @@ func TestNextScheduleTimeDuration(t *testing.T) {
}
}
func TestIsJobSucceeded(t *testing.T) {
tests := map[string]struct {
job batchv1.Job
wantResult bool
}{
"job doesn't have any conditions": {
wantResult: false,
},
"job has Complete=True condition": {
job: batchv1.Job{
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuspended,
Status: v1.ConditionFalse,
},
{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
wantResult: true,
},
"job has Complete=False condition": {
job: batchv1.Job{
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailed,
Status: v1.ConditionTrue,
},
{
Type: batchv1.JobComplete,
Status: v1.ConditionFalse,
},
},
},
},
wantResult: false,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
gotResult := IsJobSucceeded(&tc.job)
if tc.wantResult != gotResult {
t.Errorf("unexpected result, want=%v, got=%v", tc.wantResult, gotResult)
}
})
}
}
func topOfTheHour() *time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {

View File

@ -50,6 +50,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/controller/job/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
@ -426,7 +427,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool)
return
}
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
if job == nil || IsJobFinished(job) {
if job == nil || util.IsJobFinished(job) {
// syncJob will not remove this finalizer.
if hasFinalizer {
jm.enqueueOrphanPod(pod)
@ -480,7 +481,7 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
// The job shouldn't be marked as finished until all pod finalizers are removed.
// This is a backup operation in this case.
if IsJobFinished(curJob) {
if util.IsJobFinished(curJob) {
jm.cleanupPodFinalizers(curJob)
}
@ -655,7 +656,7 @@ func (jm *Controller) syncOrphanPod(ctx context.Context, key string) error {
return nil
}
}
if job != nil && !IsJobFinished(job) {
if job != nil && !util.IsJobFinished(job) {
// The pod was adopted. Do not remove finalizer.
return nil
}
@ -766,7 +767,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
job := *sharedJob.DeepCopy()
// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
if util.IsJobFinished(&job) {
err := jm.podBackoffStore.removeBackoffRecord(key)
if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs

View File

@ -14,18 +14,35 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package job
package util
import (
batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)
// FinishedCondition returns true if a job is finished as well as the condition type indicating that.
// Returns false and no condition type otherwise
func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func IsJobFinished(j *batch.Job) bool {
isFinished, _ := FinishedCondition(j)
return isFinished
}
// IsJobSucceeded returns whether a job has completed successfully.
func IsJobSucceeded(j *batch.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
if c.Type == batch.JobComplete && c.Status == v1.ConditionTrue {
return true
}
}

View File

@ -0,0 +1,209 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
)
func TestFinishedCondition(t *testing.T) {
tests := map[string]struct {
conditions []batch.JobCondition
wantJobFinished bool
wantConditionType batch.JobConditionType
}{
"Job doesn't have any conditions": {
wantJobFinished: false,
wantConditionType: "",
},
"Job is completed and condition is true": {
conditions: []batch.JobCondition{
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
},
},
wantJobFinished: true,
wantConditionType: batch.JobComplete,
},
"Job is completed and condition is false": {
conditions: []batch.JobCondition{
{
Type: batch.JobComplete,
Status: v1.ConditionFalse,
},
},
wantJobFinished: false,
wantConditionType: "",
},
"Job is completed and condition is unknown": {
conditions: []batch.JobCondition{
{
Type: batch.JobComplete,
Status: v1.ConditionUnknown,
},
},
wantJobFinished: false,
wantConditionType: "",
},
"Job has multiple conditions, one of them being complete and condition true": {
conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: v1.ConditionFalse,
},
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
},
{
Type: batch.JobFailed,
Status: v1.ConditionFalse,
},
},
wantJobFinished: true,
wantConditionType: batch.JobComplete,
},
"Job is failed and condition is true": {
conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
},
},
wantJobFinished: true,
wantConditionType: batch.JobFailed,
},
"Job is failed and condition is false": {
conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionFalse,
},
},
wantJobFinished: false,
wantConditionType: "",
},
"Job is failed and condition is unknown": {
conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionUnknown,
},
},
wantJobFinished: false,
wantConditionType: "",
},
"Job has multiple conditions, none of them has condition true": {
conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: v1.ConditionFalse,
},
{
Type: batch.JobComplete,
Status: v1.ConditionFalse,
},
{
Type: batch.JobFailed,
Status: v1.ConditionFalse,
},
},
wantJobFinished: false,
wantConditionType: "",
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
job := &batch.Job{
Status: batch.JobStatus{
Conditions: test.conditions,
},
}
isJobFinished, conditionType := FinishedCondition(job)
if isJobFinished != test.wantJobFinished {
if test.wantJobFinished {
t.Error("Expected the job to be finished")
} else {
t.Error("Expected the job to be unfinished")
}
}
if conditionType != test.wantConditionType {
t.Errorf("Unexpected job condition type. got: '%v', want: '%v'", conditionType, test.wantConditionType)
}
})
}
}
func TestIsJobSucceeded(t *testing.T) {
tests := map[string]struct {
job batch.Job
wantResult bool
}{
"job doesn't have any conditions": {
wantResult: false,
},
"job has Complete=True condition": {
job: batch.Job{
Status: batch.JobStatus{
Conditions: []batch.JobCondition{
{
Type: batch.JobSuspended,
Status: v1.ConditionFalse,
},
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
wantResult: true,
},
"job has Complete=False condition": {
job: batch.Job{
Status: batch.JobStatus{
Conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
},
{
Type: batch.JobComplete,
Status: v1.ConditionFalse,
},
},
},
},
wantResult: false,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
gotResult := IsJobSucceeded(&tc.job)
if tc.wantResult != gotResult {
t.Errorf("unexpected result, want=%v, got=%v", tc.wantResult, gotResult)
}
})
}
}

View File

@ -1,82 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"testing"
batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
)
func TestIsJobFinished(t *testing.T) {
testCases := map[string]struct {
conditionType batch.JobConditionType
conditionStatus v1.ConditionStatus
expectJobNotFinished bool
}{
"Job is completed and condition is true": {
batch.JobComplete,
v1.ConditionTrue,
false,
},
"Job is completed and condition is false": {
batch.JobComplete,
v1.ConditionFalse,
true,
},
"Job is completed and condition is unknown": {
batch.JobComplete,
v1.ConditionUnknown,
true,
},
"Job is failed and condition is true": {
batch.JobFailed,
v1.ConditionTrue,
false,
},
"Job is failed and condition is false": {
batch.JobFailed,
v1.ConditionFalse,
true,
},
"Job is failed and condition is unknown": {
batch.JobFailed,
v1.ConditionUnknown,
true,
},
}
for name, tc := range testCases {
job := &batch.Job{
Status: batch.JobStatus{
Conditions: []batch.JobCondition{{
Type: tc.conditionType,
Status: tc.conditionStatus,
}},
},
}
if tc.expectJobNotFinished == IsJobFinished(job) {
if tc.expectJobNotFinished {
t.Errorf("test name: %s, job was not expected to be finished", name)
} else {
t.Errorf("test name: %s, job was expected to be finished", name)
}
}
}
}

View File

@ -37,7 +37,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubernetes/pkg/controller"
jobutil "k8s.io/kubernetes/pkg/controller/job"
jobutil "k8s.io/kubernetes/pkg/controller/job/util"
"k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics"
"k8s.io/utils/clock"
)

View File

@ -39,7 +39,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/retry"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/controller/job"
jobutil "k8s.io/kubernetes/pkg/controller/job/util"
"k8s.io/kubernetes/test/e2e/framework"
e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
@ -715,7 +715,7 @@ func waitForAnyFinishedJob(ctx context.Context, c clientset.Interface, ns string
return false, err
}
for i := range jobs.Items {
if job.IsJobFinished(&jobs.Items[i]) {
if jobutil.IsJobFinished(&jobs.Items[i]) {
return true, nil
}
}
@ -761,7 +761,7 @@ func filterNotDeletedJobs(jobs *batchv1.JobList) []*batchv1.Job {
func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) {
for i := range jobs.Items {
j := jobs.Items[i]
if !job.IsJobFinished(&j) {
if !jobutil.IsJobFinished(&j) {
active = append(active, &j)
} else {
finished = append(finished, &j)