Switch cronjob controller to batch/v1

This commit is contained in:
Maciej Szulik 2021-02-24 22:12:26 +01:00
parent 7c194bb3e3
commit 78f51f8fa5
No known key found for this signature in database
GPG Key ID: F15E55D276FA84C4
8 changed files with 84 additions and 94 deletions

View File

@ -44,12 +44,12 @@ func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
} }
func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) { func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "cronjobs"}] {
return nil, false, nil return nil, false, nil
} }
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) {
cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(), cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(),
ctx.InformerFactory.Batch().V1beta1().CronJobs(), ctx.InformerFactory.Batch().V1().CronJobs(),
ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
) )
if err != nil { if err != nil {

View File

@ -36,7 +36,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -56,7 +55,7 @@ import (
// Utilities for dealing with Jobs and CronJobs and time. // Utilities for dealing with Jobs and CronJobs and time.
// controllerKind contains the schema.GroupVersionKind for this controller type. // controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batchv1beta1.SchemeGroupVersion.WithKind("CronJob") var controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
// Controller is a controller for CronJobs. // Controller is a controller for CronJobs.
type Controller struct { type Controller struct {
@ -129,11 +128,11 @@ func (jm *Controller) syncAll() {
klog.V(4).Infof("Found %d groups", len(jobsByCj)) klog.V(4).Infof("Found %d groups", len(jobsByCj))
err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(ctx, opts) return jm.kubeClient.BatchV1().CronJobs(metav1.NamespaceAll).List(ctx, opts)
}).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { }).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
cj, ok := object.(*batchv1beta1.CronJob) cj, ok := object.(*batchv1.CronJob)
if !ok { if !ok {
return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj) return fmt.Errorf("expected type *batchv1.CronJob, got type %T", cj)
} }
syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder) syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder) cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
@ -147,7 +146,7 @@ func (jm *Controller) syncAll() {
} }
// cleanupFinishedJobs cleanups finished jobs created by a CronJob // cleanupFinishedJobs cleanups finished jobs created by a CronJob
func cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, func cleanupFinishedJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface,
cjc cjControlInterface, recorder record.EventRecorder) { cjc cjControlInterface, recorder record.EventRecorder) {
// If neither limits are active, there is no need to do anything. // If neither limits are active, there is no need to do anything.
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
@ -190,7 +189,7 @@ func cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr
} }
// removeOldestJobs removes the oldest jobs from a list of jobs // removeOldestJobs removes the oldest jobs from a list of jobs
func removeOldestJobs(cj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) { func removeOldestJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
numToDelete := len(js) - int(maxJobs) numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 { if numToDelete <= 0 {
return return
@ -210,7 +209,7 @@ func removeOldestJobs(cj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlI
// All known jobs created by "cj" should be included in "js". // All known jobs created by "cj" should be included in "js".
// The current time is passed in to facilitate testing. // The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing. // It has no receiver, to facilitate testing.
func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) { func syncOne(cj *batchv1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
childrenJobs := make(map[types.UID]bool) childrenJobs := make(map[types.UID]bool)
@ -296,7 +295,7 @@ func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
// can see easily that there was a missed execution. // can see easily that there was a missed execution.
return return
} }
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 { if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs, // Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one. // there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod). // (because we haven't seen the status update to the SJ or the created pod).
@ -309,7 +308,7 @@ func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return return
} }
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, j := range cj.Status.Active { for _, j := range cj.Status.Active {
klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
@ -367,7 +366,7 @@ func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
} }
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list // deleteJob reaps a job, deleting the job, the pods and the reference in the active list
func deleteJob(cj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
// delete the job itself... // delete the job itself...

View File

@ -23,7 +23,6 @@ import (
"time" "time"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchV1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -106,19 +105,18 @@ func startTimeStringToTime(startTime string) time.Time {
} }
// returns a cronJob with some fields filled in. // returns a cronJob with some fields filled in.
func cronJob() batchV1beta1.CronJob { func cronJob() batchv1.CronJob {
return batchV1beta1.CronJob{ return batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: "snazzycats", Namespace: "snazzycats",
UID: types.UID("1a2b3c"), UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v1beta1/namespaces/snazzycats/cronjobs/mycronjob",
CreationTimestamp: metav1.Time{Time: justBeforeTheHour()}, CreationTimestamp: metav1.Time{Time: justBeforeTheHour()},
}, },
Spec: batchV1beta1.CronJobSpec{ Spec: batchv1.CronJobSpec{
Schedule: "* * * * ?", Schedule: "* * * * ?",
ConcurrencyPolicy: batchV1beta1.AllowConcurrent, ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchV1beta1.JobTemplateSpec{ JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"}, Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"}, Annotations: map[string]string{"x": "y"},
@ -155,7 +153,6 @@ func newJob(UID string) batchv1.Job {
UID: types.UID(UID), UID: types.UID(UID),
Name: "foobar", Name: "foobar",
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/myjob",
}, },
Spec: jobSpec(), Spec: jobSpec(),
} }
@ -166,9 +163,9 @@ var (
mediumDead int64 = 2 * 60 * 60 mediumDead int64 = 2 * 60 * 60
longDead int64 = 1000000 longDead int64 = 1000000
noDead int64 = -12345 noDead int64 = -12345
A = batchV1beta1.AllowConcurrent A = batchv1.AllowConcurrent
f = batchV1beta1.ForbidConcurrent f = batchv1.ForbidConcurrent
R = batchV1beta1.ReplaceConcurrent R = batchv1.ReplaceConcurrent
T = true T = true
F = false F = false
) )
@ -189,7 +186,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// cj spec // cj spec
concurrencyPolicy batchV1beta1.ConcurrencyPolicy concurrencyPolicy batchv1.ConcurrencyPolicy
suspend bool suspend bool
schedule string schedule string
deadline int64 deadline int64
@ -314,7 +311,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
if controllerRef == nil { if controllerRef == nil {
t.Errorf("%s: expected job to have ControllerRef: %#v", name, job) t.Errorf("%s: expected job to have ControllerRef: %#v", name, job)
} else { } else {
if got, want := controllerRef.APIVersion, "batch/v1beta1"; got != want { if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want) t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want)
} }
if got, want := controllerRef.Kind, "CronJob"; got != want { if got, want := controllerRef.Kind, "CronJob"; got != want {
@ -617,7 +614,7 @@ func TestSyncOne_Status(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// cj spec // cj spec
concurrencyPolicy batchV1beta1.ConcurrencyPolicy concurrencyPolicy batchv1.ConcurrencyPolicy
suspend bool suspend bool
schedule string schedule string
deadline int64 deadline int64

View File

@ -25,7 +25,6 @@ import (
"github.com/robfig/cron" "github.com/robfig/cron"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -34,12 +33,10 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
batchv1informers "k8s.io/client-go/informers/batch/v1" batchv1informers "k8s.io/client-go/informers/batch/v1"
batchv1beta1informers "k8s.io/client-go/informers/batch/v1beta1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
covev1client "k8s.io/client-go/kubernetes/typed/core/v1" covev1client "k8s.io/client-go/kubernetes/typed/core/v1"
batchv1listers "k8s.io/client-go/listers/batch/v1" batchv1listers "k8s.io/client-go/listers/batch/v1"
batchv1beta1listers "k8s.io/client-go/listers/batch/v1beta1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -63,7 +60,7 @@ type ControllerV2 struct {
cronJobControl cjControlInterface cronJobControl cjControlInterface
jobLister batchv1listers.JobLister jobLister batchv1listers.JobLister
cronJobLister batchv1beta1listers.CronJobLister cronJobLister batchv1listers.CronJobLister
jobListerSynced cache.InformerSynced jobListerSynced cache.InformerSynced
cronJobListerSynced cache.InformerSynced cronJobListerSynced cache.InformerSynced
@ -73,7 +70,7 @@ type ControllerV2 struct {
} }
// NewControllerV2 creates and initializes a new Controller. // NewControllerV2 creates and initializes a new Controller.
func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1beta1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
@ -208,7 +205,7 @@ func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) {
// resolveControllerRef returns the controller referenced by a ControllerRef, // resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller // or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind. // of the correct Kind.
func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batchv1beta1.CronJob { func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batchv1.CronJob {
// We can't look up by UID, so look up by Name and then verify UID. // We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind. // Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind { if controllerRef.Kind != controllerKind.Kind {
@ -226,7 +223,7 @@ func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *me
return cronJob return cronJob
} }
func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]*batchv1.Job, error) { func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1.CronJob) ([]*batchv1.Job, error) {
var jobSelector labels.Selector var jobSelector labels.Selector
if len(cronJob.Spec.JobTemplate.Labels) == 0 { if len(cronJob.Spec.JobTemplate.Labels) == 0 {
jobSelector = labels.Everything() jobSelector = labels.Everything()
@ -359,8 +356,8 @@ func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration)
// updateCronJob re-queues the CronJob for next scheduled time if there is a // updateCronJob re-queues the CronJob for next scheduled time if there is a
// change in spec.schedule otherwise it re-queues it now // change in spec.schedule otherwise it re-queues it now
func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
oldCJ, okOld := old.(*batchv1beta1.CronJob) oldCJ, okOld := old.(*batchv1.CronJob)
newCJ, okNew := curr.(*batchv1beta1.CronJob) newCJ, okNew := curr.(*batchv1.CronJob)
if !okOld || !okNew { if !okOld || !okNew {
// typecasting of one failed, handle this better, may be log entry // typecasting of one failed, handle this better, may be log entry
@ -400,8 +397,8 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
// It returns a copy of the CronJob that is to be used by other functions // It returns a copy of the CronJob that is to be used by other functions
// that mutates the object // that mutates the object
func (jm *ControllerV2) syncCronJob( func (jm *ControllerV2) syncCronJob(
cj *batchv1beta1.CronJob, cj *batchv1.CronJob,
js []*batchv1.Job) (*batchv1beta1.CronJob, *time.Duration, error) { js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) {
cj = cj.DeepCopy() cj = cj.DeepCopy()
now := jm.now() now := jm.now()
@ -525,7 +522,7 @@ func (jm *ControllerV2) syncCronJob(
t := nextScheduledTimeDuration(sched, now) t := nextScheduledTimeDuration(sched, now)
return cj, t, nil return cj, t, nil
} }
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 { if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs, // Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one. // there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod). // (because we haven't seen the status update to the SJ or the created pod).
@ -540,7 +537,7 @@ func (jm *ControllerV2) syncCronJob(
t := nextScheduledTimeDuration(sched, now) t := nextScheduledTimeDuration(sched, now)
return cj, t, nil return cj, t, nil
} }
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, j := range cj.Status.Active { for _, j := range cj.Status.Active {
klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name)) klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
@ -604,7 +601,7 @@ func (jm *ControllerV2) syncCronJob(
return cj, t, nil return cj, t, nil
} }
func getJobName(cj *batchv1beta1.CronJob, scheduledTime time.Time) string { func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {
return fmt.Sprintf("%s-%d", cj.Name, getTimeHashInMinutes(scheduledTime)) return fmt.Sprintf("%s-%d", cj.Name, getTimeHashInMinutes(scheduledTime))
} }
@ -619,7 +616,7 @@ func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duratio
} }
// cleanupFinishedJobs cleanups finished jobs created by a CronJob // cleanupFinishedJobs cleanups finished jobs created by a CronJob
func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []*batchv1.Job) error { func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1.CronJob, js []*batchv1.Job) error {
// If neither limits are active, there is no need to do anything. // If neither limits are active, there is no need to do anything.
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
return nil return nil
@ -664,7 +661,7 @@ func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobCond
} }
// removeOldestJobs removes the oldest jobs from a list of jobs // removeOldestJobs removes the oldest jobs from a list of jobs
func (jm *ControllerV2) removeOldestJobs(cj *batchv1beta1.CronJob, js []*batchv1.Job, maxJobs int32) { func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) {
numToDelete := len(js) - int(maxJobs) numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 { if numToDelete <= 0 {
return return

View File

@ -24,9 +24,9 @@ import (
"time" "time"
"github.com/robfig/cron" "github.com/robfig/cron"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -62,7 +62,7 @@ func Test_syncCronJob(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// cj spec // cj spec
concurrencyPolicy batchv1beta1.ConcurrencyPolicy concurrencyPolicy batchv1.ConcurrencyPolicy
suspend bool suspend bool
schedule string schedule string
deadline int64 deadline int64
@ -241,7 +241,7 @@ func Test_syncCronJob(t *testing.T) {
if controllerRef == nil { if controllerRef == nil {
t.Errorf("%s: expected job to have ControllerRef: %#v", name, job) t.Errorf("%s: expected job to have ControllerRef: %#v", name, job)
} else { } else {
if got, want := controllerRef.APIVersion, "batch/v1beta1"; got != want { if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want) t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want)
} }
if got, want := controllerRef.Kind, "CronJob"; got != want { if got, want := controllerRef.Kind, "CronJob"; got != want {
@ -332,8 +332,8 @@ func TestController2_updateCronJob(t *testing.T) {
cronJobControl cjControlInterface cronJobControl cjControlInterface
} }
type args struct { type args struct {
oldJobTemplate *batchv1beta1.JobTemplateSpec oldJobTemplate *batchv1.JobTemplateSpec
newJobTemplate *batchv1beta1.JobTemplateSpec newJobTemplate *batchv1.JobTemplateSpec
oldJobSchedule string oldJobSchedule string
newJobSchedule string newJobSchedule string
} }
@ -352,14 +352,14 @@ func TestController2_updateCronJob(t *testing.T) {
cronJobControl: cjc, cronJobControl: cjc,
}, },
args: args{ args: args{
oldJobTemplate: &batchv1beta1.JobTemplateSpec{ oldJobTemplate: &batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"}, Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"}, Annotations: map[string]string{"x": "y"},
}, },
Spec: jobSpec(), Spec: jobSpec(),
}, },
newJobTemplate: &batchv1beta1.JobTemplateSpec{ newJobTemplate: &batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "foo"}, Labels: map[string]string{"a": "foo"},
Annotations: map[string]string{"x": "y"}, Annotations: map[string]string{"x": "y"},
@ -459,7 +459,7 @@ func TestControllerV2_getJobList(t *testing.T) {
jobLister batchv1listers.JobLister jobLister batchv1listers.JobLister
} }
type args struct { type args struct {
cronJob *batchv1beta1.CronJob cronJob *batchv1.CronJob
} }
tests := []struct { tests := []struct {
name string name string
@ -482,7 +482,7 @@ func TestControllerV2_getJobList(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"},
}, },
}}}, }}},
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, args: args{cronJob: &batchv1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
want: []*batchv1.Job{}, want: []*batchv1.Job{},
}, },
{ {
@ -505,7 +505,7 @@ func TestControllerV2_getJobList(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"},
}, },
}}}, }}},
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, args: args{cronJob: &batchv1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
want: []*batchv1.Job{{ want: []*batchv1.Job{{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns", ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns",
OwnerReferences: []metav1.OwnerReference{ OwnerReferences: []metav1.OwnerReference{
@ -530,7 +530,7 @@ func TestControllerV2_getJobList(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "bar-ns"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "bar-ns"},
}, },
}}}, }}},
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, args: args{cronJob: &batchv1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
want: []*batchv1.Job{}, want: []*batchv1.Job{},
}, },
} }

View File

@ -19,15 +19,14 @@ package cronjob
import ( import (
"context" "context"
"fmt" "fmt"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"sync" "sync"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@ -36,9 +35,9 @@ import (
// cjControlInterface is an interface that knows how to update CronJob status // cjControlInterface is an interface that knows how to update CronJob status
// created as an interface to allow testing. // created as an interface to allow testing.
type cjControlInterface interface { type cjControlInterface interface {
UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error)
// GetCronJob retrieves a CronJob. // GetCronJob retrieves a CronJob.
GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) GetCronJob(namespace, name string) (*batchv1.CronJob, error)
} }
// realCJControl is the default implementation of cjControlInterface. // realCJControl is the default implementation of cjControlInterface.
@ -46,23 +45,23 @@ type realCJControl struct {
KubeClient clientset.Interface KubeClient clientset.Interface
} }
func (c *realCJControl) GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) { func (c *realCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, error) {
return c.KubeClient.BatchV1beta1().CronJobs(namespace).Get(context.TODO(), name, metav1.GetOptions{}) return c.KubeClient.BatchV1().CronJobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
} }
var _ cjControlInterface = &realCJControl{} var _ cjControlInterface = &realCJControl{}
func (c *realCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) { func (c *realCJControl) UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) {
return c.KubeClient.BatchV1beta1().CronJobs(cj.Namespace).UpdateStatus(context.TODO(), cj, metav1.UpdateOptions{}) return c.KubeClient.BatchV1().CronJobs(cj.Namespace).UpdateStatus(context.TODO(), cj, metav1.UpdateOptions{})
} }
// fakeCJControl is the default implementation of cjControlInterface. // fakeCJControl is the default implementation of cjControlInterface.
type fakeCJControl struct { type fakeCJControl struct {
CronJob *batchv1beta1.CronJob CronJob *batchv1.CronJob
Updates []batchv1beta1.CronJob Updates []batchv1.CronJob
} }
func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) { func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, error) {
if name == c.CronJob.Name && namespace == c.CronJob.Namespace { if name == c.CronJob.Name && namespace == c.CronJob.Namespace {
return c.CronJob, nil return c.CronJob, nil
} }
@ -74,7 +73,7 @@ func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1beta1.CronJo
var _ cjControlInterface = &fakeCJControl{} var _ cjControlInterface = &fakeCJControl{}
func (c *fakeCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) { func (c *fakeCJControl) UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) {
c.Updates = append(c.Updates, *cj) c.Updates = append(c.Updates, *cj)
return cj, nil return cj, nil
} }
@ -105,7 +104,7 @@ type realJobControl struct {
var _ jobControlInterface = &realJobControl{} var _ jobControlInterface = &realJobControl{}
func copyLabels(template *batchv1beta1.JobTemplateSpec) labels.Set { func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
l := make(labels.Set) l := make(labels.Set)
for k, v := range template.Labels { for k, v := range template.Labels {
l[k] = v l[k] = v
@ -113,7 +112,7 @@ func copyLabels(template *batchv1beta1.JobTemplateSpec) labels.Set {
return l return l
} }
func copyAnnotations(template *batchv1beta1.JobTemplateSpec) labels.Set { func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set {
a := make(labels.Set) a := make(labels.Set)
for k, v := range template.Annotations { for k, v := range template.Annotations {
a[k] = v a[k] = v

View File

@ -24,7 +24,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -33,7 +32,7 @@ import (
// Utilities for dealing with Jobs and CronJobs and time. // Utilities for dealing with Jobs and CronJobs and time.
func inActiveList(cj batchv1beta1.CronJob, uid types.UID) bool { func inActiveList(cj batchv1.CronJob, uid types.UID) bool {
for _, j := range cj.Status.Active { for _, j := range cj.Status.Active {
if j.UID == uid { if j.UID == uid {
return true return true
@ -42,7 +41,7 @@ func inActiveList(cj batchv1beta1.CronJob, uid types.UID) bool {
return false return false
} }
func deleteFromActiveList(cj *batchv1beta1.CronJob, uid types.UID) { func deleteFromActiveList(cj *batchv1.CronJob, uid types.UID) {
if cj == nil { if cj == nil {
return return
} }
@ -92,7 +91,7 @@ func groupJobsByParent(js []batchv1.Job) map[types.UID][]batchv1.Job {
// //
// If there are too many (>100) unstarted times, just give up and return an empty slice. // If there are too many (>100) unstarted times, just give up and return an empty slice.
// If there were missed times prior to the last known start time, then those are not returned. // If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) { func getRecentUnmetScheduleTimes(cj batchv1.CronJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{} starts := []time.Time{}
sched, err := cron.ParseStandard(cj.Spec.Schedule) sched, err := cron.ParseStandard(cj.Spec.Schedule)
if err != nil { if err != nil {
@ -154,7 +153,7 @@ func getRecentUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time) ([]time
// it returns nil if no unmet schedule times. // it returns nil if no unmet schedule times.
// If there are too many (>100) unstarted times, it will raise a warning and but still return // If there are too many (>100) unstarted times, it will raise a warning and but still return
// the list of missed times. // the list of missed times.
func getNextScheduleTime(cj batchv1beta1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) { func getNextScheduleTime(cj batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
starts := []time.Time{} starts := []time.Time{}
var earliestTime time.Time var earliestTime time.Time
@ -234,7 +233,7 @@ func getMostRecentScheduleTime(earliestTime time.Time, now time.Time, schedule c
} }
// getJobFromTemplate makes a Job from a CronJob // getJobFromTemplate makes a Job from a CronJob
func getJobFromTemplate(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) { func getJobFromTemplate(cj *batchv1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
labels := copyLabels(&cj.Spec.JobTemplate) labels := copyLabels(&cj.Spec.JobTemplate)
annotations := copyAnnotations(&cj.Spec.JobTemplate) annotations := copyAnnotations(&cj.Spec.JobTemplate)
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
@ -260,7 +259,7 @@ func getTimeHash(scheduledTime time.Time) int64 {
// getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from // getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from
// epoch time and concatenates that to the job name, because the cronjob_controller v2 has the lowest // epoch time and concatenates that to the job name, because the cronjob_controller v2 has the lowest
// granularity of 1 minute for scheduling job. // granularity of 1 minute for scheduling job.
func getJobFromTemplate2(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) { func getJobFromTemplate2(cj *batchv1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
labels := copyLabels(&cj.Spec.JobTemplate) labels := copyLabels(&cj.Spec.JobTemplate)
annotations := copyAnnotations(&cj.Spec.JobTemplate) annotations := copyAnnotations(&cj.Spec.JobTemplate)
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice

View File

@ -25,7 +25,6 @@ import (
cron "github.com/robfig/cron" cron "github.com/robfig/cron"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -40,17 +39,17 @@ func TestGetJobFromTemplate(t *testing.T) {
var one int64 = 1 var one int64 = 1
var no bool var no bool
cj := batchv1beta1.CronJob{ cj := batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: "snazzycats", Namespace: "snazzycats",
UID: types.UID("1a2b3c"), UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob", SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob",
}, },
Spec: batchv1beta1.CronJobSpec{ Spec: batchv1.CronJobSpec{
Schedule: "* * * * ?", Schedule: "* * * * ?",
ConcurrencyPolicy: batchv1beta1.AllowConcurrent, ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1beta1.JobTemplateSpec{ JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"}, Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"}, Annotations: map[string]string{"x": "y"},
@ -98,17 +97,17 @@ func TestGetJobFromTemplate2(t *testing.T) {
var one int64 = 1 var one int64 = 1
var no bool var no bool
cj := batchv1beta1.CronJob{ cj := batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: "snazzycats", Namespace: "snazzycats",
UID: types.UID("1a2b3c"), UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob", SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob",
}, },
Spec: batchv1beta1.CronJobSpec{ Spec: batchv1.CronJobSpec{
Schedule: "* * * * ?", Schedule: "* * * * ?",
ConcurrencyPolicy: batchv1beta1.AllowConcurrent, ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1beta1.JobTemplateSpec{ JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"}, Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"}, Annotations: map[string]string{"x": "y"},
@ -326,16 +325,16 @@ func TestGetNextScheduleTime(t *testing.T) {
t.Errorf("test setup error: %v", err) t.Errorf("test setup error: %v", err)
} }
cj := batchv1beta1.CronJob{ cj := batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"), UID: types.UID("1a2b3c"),
}, },
Spec: batchv1beta1.CronJobSpec{ Spec: batchv1.CronJobSpec{
Schedule: schedule, Schedule: schedule,
ConcurrencyPolicy: batchv1beta1.AllowConcurrent, ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1beta1.JobTemplateSpec{}, JobTemplate: batchv1.JobTemplateSpec{},
}, },
} }
{ {
@ -442,16 +441,16 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
t.Errorf("test setup error: %v", err) t.Errorf("test setup error: %v", err)
} }
cj := batchv1beta1.CronJob{ cj := batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"), UID: types.UID("1a2b3c"),
}, },
Spec: batchv1beta1.CronJobSpec{ Spec: batchv1.CronJobSpec{
Schedule: schedule, Schedule: schedule,
ConcurrencyPolicy: batchv1beta1.AllowConcurrent, ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1beta1.JobTemplateSpec{}, JobTemplate: batchv1.JobTemplateSpec{},
}, },
} }
{ {