mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
convert to stardard lister, use []*batchv1.Job instead of []batchv1.Job
This commit is contained in:
parent
667d1c2c3f
commit
41c82e69ed
@ -19,6 +19,7 @@ package cronjob
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/robfig/cron"
|
||||
@ -181,13 +182,13 @@ func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cronJob, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled)
|
||||
cj, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled)
|
||||
if err != nil {
|
||||
klog.V(2).InfoS("error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = jm.cleanupFinishedJobs(cronJob, jobsToBeReconciled)
|
||||
err = jm.cleanupFinishedJobs(cj, jobsToBeReconciled)
|
||||
if err != nil {
|
||||
klog.V(2).InfoS("error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err)
|
||||
return nil, err
|
||||
@ -222,7 +223,7 @@ func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *me
|
||||
return cronJob
|
||||
}
|
||||
|
||||
func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]batchv1.Job, error) {
|
||||
func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]*batchv1.Job, error) {
|
||||
var jobSelector labels.Selector
|
||||
if len(cronJob.Spec.JobTemplate.Labels) == 0 {
|
||||
jobSelector = labels.Everything()
|
||||
@ -234,13 +235,13 @@ func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jobsToBeReconciled := []batchv1.Job{}
|
||||
jobsToBeReconciled := []*batchv1.Job{}
|
||||
|
||||
for _, job := range jobList {
|
||||
// If it has a ControllerRef, that's all that matters.
|
||||
if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name {
|
||||
// this job is needs to be reconciled
|
||||
jobsToBeReconciled = append(jobsToBeReconciled, *job)
|
||||
jobsToBeReconciled = append(jobsToBeReconciled, job)
|
||||
}
|
||||
}
|
||||
return jobsToBeReconciled, nil
|
||||
@ -397,7 +398,7 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
|
||||
// that mutates the object
|
||||
func (jm *ControllerV2) syncCronJob(
|
||||
cj *batchv1beta1.CronJob,
|
||||
js []batchv1.Job) (*batchv1beta1.CronJob, *time.Duration, error) {
|
||||
js []*batchv1.Job) (*batchv1beta1.CronJob, *time.Duration, error) {
|
||||
|
||||
cj = cj.DeepCopy()
|
||||
now := jm.now()
|
||||
@ -406,14 +407,14 @@ func (jm *ControllerV2) syncCronJob(
|
||||
for _, j := range js {
|
||||
childrenJobs[j.ObjectMeta.UID] = true
|
||||
found := inActiveList(*cj, j.ObjectMeta.UID)
|
||||
if !found && !IsJobFinished(&j) {
|
||||
if !found && !IsJobFinished(j) {
|
||||
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
|
||||
// We found an unfinished job that has us as the parent, but it is not in our Active list.
|
||||
// This could happen if we crashed right after creating the Job and before updating the status,
|
||||
// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
|
||||
// a job that they wanted us to adopt.
|
||||
} else if found && IsJobFinished(&j) {
|
||||
_, status := getFinishedStatus(&j)
|
||||
} else if found && IsJobFinished(j) {
|
||||
_, status := getFinishedStatus(j)
|
||||
deleteFromActiveList(cj, j.ObjectMeta.UID)
|
||||
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
|
||||
}
|
||||
@ -597,17 +598,17 @@ func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duratio
|
||||
}
|
||||
|
||||
// cleanupFinishedJobs cleanups finished jobs created by a CronJob
|
||||
func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batchv1.Job) error {
|
||||
func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []*batchv1.Job) error {
|
||||
// If neither limits are active, there is no need to do anything.
|
||||
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
failedJobs := []batchv1.Job{}
|
||||
successfulJobs := []batchv1.Job{}
|
||||
failedJobs := []*batchv1.Job{}
|
||||
successfulJobs := []*batchv1.Job{}
|
||||
|
||||
for _, job := range js {
|
||||
isFinished, finishedStatus := getFinishedStatus(&job)
|
||||
isFinished, finishedStatus := jm.getFinishedStatus(job)
|
||||
if isFinished && finishedStatus == batchv1.JobComplete {
|
||||
successfulJobs = append(successfulJobs, job)
|
||||
} else if isFinished && finishedStatus == batchv1.JobFailed {
|
||||
@ -616,19 +617,15 @@ func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batch
|
||||
}
|
||||
|
||||
if cj.Spec.SuccessfulJobsHistoryLimit != nil {
|
||||
removeOldestJobs(cj,
|
||||
jm.removeOldestJobs(cj,
|
||||
successfulJobs,
|
||||
jm.jobControl,
|
||||
*cj.Spec.SuccessfulJobsHistoryLimit,
|
||||
jm.recorder)
|
||||
*cj.Spec.SuccessfulJobsHistoryLimit)
|
||||
}
|
||||
|
||||
if cj.Spec.FailedJobsHistoryLimit != nil {
|
||||
removeOldestJobs(cj,
|
||||
jm.removeOldestJobs(cj,
|
||||
failedJobs,
|
||||
jm.jobControl,
|
||||
*cj.Spec.FailedJobsHistoryLimit,
|
||||
jm.recorder)
|
||||
*cj.Spec.FailedJobsHistoryLimit)
|
||||
}
|
||||
|
||||
// Update the CronJob, in case jobs were removed from the list.
|
||||
@ -636,6 +633,32 @@ func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batch
|
||||
return err
|
||||
}
|
||||
|
||||
func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
|
||||
for _, c := range j.Status.Conditions {
|
||||
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
|
||||
return true, c.Type
|
||||
}
|
||||
}
|
||||
return false, ""
|
||||
}
|
||||
|
||||
// removeOldestJobs removes the oldest jobs from a list of jobs
|
||||
func (jm *ControllerV2) removeOldestJobs(cj *batchv1beta1.CronJob, js []*batchv1.Job, maxJobs int32) {
|
||||
numToDelete := len(js) - int(maxJobs)
|
||||
if numToDelete <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
|
||||
klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
|
||||
|
||||
sort.Sort(byJobStartTimeStar(js))
|
||||
for i := 0; i < numToDelete; i++ {
|
||||
klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
|
||||
deleteJob(cj, js[i], jm.jobControl, jm.recorder)
|
||||
}
|
||||
}
|
||||
|
||||
// isJobInActiveList take a job and checks if activeJobs has a job with the same
|
||||
// name and namespace.
|
||||
func isJobInActiveList(job *batchv1.Job, activeJobs []corev1.ObjectReference) bool {
|
||||
|
@ -164,7 +164,7 @@ func Test_syncOne2(t *testing.T) {
|
||||
job *batchv1.Job
|
||||
err error
|
||||
)
|
||||
js := []batchv1.Job{}
|
||||
js := []*batchv1.Job{}
|
||||
if tc.ranPreviously {
|
||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
|
||||
cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
|
||||
@ -181,7 +181,7 @@ func Test_syncOne2(t *testing.T) {
|
||||
}
|
||||
cj.Status.Active = []v1.ObjectReference{*ref}
|
||||
if !tc.jobStillNotFoundInLister {
|
||||
js = append(js, *job)
|
||||
js = append(js, job)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -458,7 +458,7 @@ func TestControllerV2_getJobList(t *testing.T) {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []batchv1.Job
|
||||
want []*batchv1.Job
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
@ -476,7 +476,7 @@ func TestControllerV2_getJobList(t *testing.T) {
|
||||
},
|
||||
}}},
|
||||
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
|
||||
want: []batchv1.Job{},
|
||||
want: []*batchv1.Job{},
|
||||
},
|
||||
{
|
||||
name: "test getting jobs in namespace with a controller reference",
|
||||
@ -499,7 +499,7 @@ func TestControllerV2_getJobList(t *testing.T) {
|
||||
},
|
||||
}}},
|
||||
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
|
||||
want: []batchv1.Job{{
|
||||
want: []*batchv1.Job{{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns",
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
@ -524,7 +524,7 @@ func TestControllerV2_getJobList(t *testing.T) {
|
||||
},
|
||||
}}},
|
||||
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
|
||||
want: []batchv1.Job{},
|
||||
want: []*batchv1.Job{},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
@ -298,3 +298,22 @@ func (o byJobStartTime) Less(i, j int) bool {
|
||||
}
|
||||
return o[i].Status.StartTime.Before(o[j].Status.StartTime)
|
||||
}
|
||||
|
||||
// byJobStartTimeStar sorts a list of jobs by start timestamp, using their names as a tie breaker.
|
||||
type byJobStartTimeStar []*batchv1.Job
|
||||
|
||||
func (o byJobStartTimeStar) Len() int { return len(o) }
|
||||
func (o byJobStartTimeStar) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
||||
|
||||
func (o byJobStartTimeStar) Less(i, j int) bool {
|
||||
if o[i].Status.StartTime == nil && o[j].Status.StartTime != nil {
|
||||
return false
|
||||
}
|
||||
if o[i].Status.StartTime != nil && o[j].Status.StartTime == nil {
|
||||
return true
|
||||
}
|
||||
if o[i].Status.StartTime.Equal(o[j].Status.StartTime) {
|
||||
return o[i].Name < o[j].Name
|
||||
}
|
||||
return o[i].Status.StartTime.Before(o[j].Status.StartTime)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user