From 8d7dd4415e28bded77667ce857e1c58016f9ab3a Mon Sep 17 00:00:00 2001 From: Alay Patel Date: Sun, 11 Oct 2020 02:49:11 -0400 Subject: [PATCH] add cronjob_controllerv2.go --- cmd/kube-controller-manager/app/batch.go | 14 +- .../app/options/cronjobcontroller.go | 56 ++ .../app/options/options.go | 9 + .../app/options/options_test.go | 9 + hack/.golint_failures | 1 + .../cronjob/cronjob_controllerv2.go | 648 ++++++++++++++++++ .../cronjob/cronjob_controllerv2_test.go | 545 +++++++++++++++ pkg/controller/cronjob/injection.go | 10 + pkg/controller/cronjob/utils.go | 100 ++- pkg/controller/cronjob/utils_test.go | 191 ++++++ pkg/features/kube_features.go | 11 + test/e2e/apps/cronjob.go | 26 + 12 files changed, 1616 insertions(+), 4 deletions(-) create mode 100644 cmd/kube-controller-manager/app/options/cronjobcontroller.go create mode 100644 pkg/controller/cronjob/cronjob_controllerv2.go create mode 100644 pkg/controller/cronjob/cronjob_controllerv2_test.go diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index e4528a99500..f42dfca418e 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -22,12 +22,13 @@ package app import ( "fmt" - "net/http" "k8s.io/apimachinery/pkg/runtime/schema" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/job" + kubefeatures "k8s.io/kubernetes/pkg/features" ) func startJobController(ctx ControllerContext) (http.Handler, bool, error) { @@ -46,6 +47,17 @@ func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { return nil, false, nil } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) { + cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(), + ctx.InformerFactory.Batch().V1beta1().CronJobs(), + ctx.ClientBuilder.ClientOrDie("cronjob-controller"), + ) + if err != nil { + return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) + } + go cj2c.Run(int(ctx.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Stop) + return nil, true, nil + } cjc, err := cronjob.NewController( ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ) diff --git a/cmd/kube-controller-manager/app/options/cronjobcontroller.go b/cmd/kube-controller-manager/app/options/cronjobcontroller.go new file mode 100644 index 00000000000..48f03ca67c0 --- /dev/null +++ b/cmd/kube-controller-manager/app/options/cronjobcontroller.go @@ -0,0 +1,56 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "github.com/spf13/pflag" + + cronjobconfig "k8s.io/kubernetes/pkg/controller/cronjob/config" +) + +// CronJobControllerOptions holds the CronJobController options. +type CronJobControllerOptions struct { + *cronjobconfig.CronJobControllerConfiguration +} + +// AddFlags adds flags related to JobController for controller manager to the specified FlagSet. +func (o *CronJobControllerOptions) AddFlags(fs *pflag.FlagSet) { + if o == nil { + return + } +} + +// ApplyTo fills up JobController config with options. +func (o *CronJobControllerOptions) ApplyTo(cfg *cronjobconfig.CronJobControllerConfiguration) error { + if o == nil { + return nil + } + + cfg.ConcurrentCronJobSyncs = o.ConcurrentCronJobSyncs + + return nil +} + +// Validate checks validation of CronJobControllerOptions. +func (o *CronJobControllerOptions) Validate() []error { + if o == nil { + return nil + } + + errs := []error{} + return errs +} diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 876cde18943..d0475bd0c20 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -72,6 +72,7 @@ type KubeControllerManagerOptions struct { GarbageCollectorController *GarbageCollectorControllerOptions HPAController *HPAControllerOptions JobController *JobControllerOptions + CronJobController *CronJobControllerOptions NamespaceController *NamespaceControllerOptions NodeIPAMController *NodeIPAMControllerOptions NodeLifecycleController *NodeLifecycleControllerOptions @@ -145,6 +146,9 @@ func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) { JobController: &JobControllerOptions{ &componentConfig.JobController, }, + CronJobController: &CronJobControllerOptions{ + &componentConfig.CronJobController, + }, NamespaceController: &NamespaceControllerOptions{ &componentConfig.NamespaceController, }, @@ -245,6 +249,7 @@ func (s *KubeControllerManagerOptions) Flags(allControllers []string, disabledBy s.GarbageCollectorController.AddFlags(fss.FlagSet("garbagecollector controller")) s.HPAController.AddFlags(fss.FlagSet("horizontalpodautoscaling controller")) s.JobController.AddFlags(fss.FlagSet("job controller")) + s.CronJobController.AddFlags(fss.FlagSet("cronjob controller")) s.NamespaceController.AddFlags(fss.FlagSet("namespace controller")) s.NodeIPAMController.AddFlags(fss.FlagSet("nodeipam controller")) s.NodeLifecycleController.AddFlags(fss.FlagSet("nodelifecycle controller")) @@ -310,6 +315,9 @@ func (s *KubeControllerManagerOptions) ApplyTo(c *kubecontrollerconfig.Config) e if err := s.JobController.ApplyTo(&c.ComponentConfig.JobController); err != nil { return err } + if err := s.CronJobController.ApplyTo(&c.ComponentConfig.CronJobController); err != nil { + return err + } if err := s.NamespaceController.ApplyTo(&c.ComponentConfig.NamespaceController); err != nil { return err } @@ -384,6 +392,7 @@ func (s *KubeControllerManagerOptions) Validate(allControllers []string, disable errs = append(errs, s.GarbageCollectorController.Validate()...) errs = append(errs, s.HPAController.Validate()...) errs = append(errs, s.JobController.Validate()...) + errs = append(errs, s.CronJobController.Validate()...) errs = append(errs, s.NamespaceController.Validate()...) errs = append(errs, s.NodeIPAMController.Validate()...) errs = append(errs, s.NodeLifecycleController.Validate()...) diff --git a/cmd/kube-controller-manager/app/options/options_test.go b/cmd/kube-controller-manager/app/options/options_test.go index 9ddefbd6cd9..8955f9971bc 100644 --- a/cmd/kube-controller-manager/app/options/options_test.go +++ b/cmd/kube-controller-manager/app/options/options_test.go @@ -38,6 +38,7 @@ import ( kubecontrollerconfig "k8s.io/kubernetes/cmd/kube-controller-manager/app/config" kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config" csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config" + cronjobconfig "k8s.io/kubernetes/pkg/controller/cronjob/config" daemonconfig "k8s.io/kubernetes/pkg/controller/daemon/config" deploymentconfig "k8s.io/kubernetes/pkg/controller/deployment/config" endpointconfig "k8s.io/kubernetes/pkg/controller/endpoint/config" @@ -314,6 +315,11 @@ func TestAddFlags(t *testing.T) { ConcurrentJobSyncs: 5, }, }, + CronJobController: &CronJobControllerOptions{ + &cronjobconfig.CronJobControllerConfiguration{ + ConcurrentCronJobSyncs: 5, + }, + }, NamespaceController: &NamespaceControllerOptions{ &namespaceconfig.NamespaceControllerConfiguration{ NamespaceSyncPeriod: metav1.Duration{Duration: 10 * time.Minute}, @@ -566,6 +572,9 @@ func TestApplyTo(t *testing.T) { JobController: jobconfig.JobControllerConfiguration{ ConcurrentJobSyncs: 5, }, + CronJobController: cronjobconfig.CronJobControllerConfiguration{ + ConcurrentCronJobSyncs: 5, + }, NamespaceController: namespaceconfig.NamespaceControllerConfiguration{ NamespaceSyncPeriod: metav1.Duration{Duration: 10 * time.Minute}, ConcurrentNamespaceSyncs: 20, diff --git a/hack/.golint_failures b/hack/.golint_failures index 85d4af8d0ac..afa46d53cfe 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -50,6 +50,7 @@ pkg/controller/certificates pkg/controller/certificates/signer pkg/controller/certificates/signer/config/v1alpha1 pkg/controller/clusterroleaggregation +pkg/controller/cronjob/config/v1alpha1 pkg/controller/daemon pkg/controller/daemon/config/v1alpha1 pkg/controller/deployment diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go new file mode 100644 index 00000000000..13d27a480e1 --- /dev/null +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -0,0 +1,648 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjob + +import ( + "fmt" + "reflect" + "time" + + "github.com/robfig/cron" + + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + batchv1informers "k8s.io/client-go/informers/batch/v1" + batchv1beta1informers "k8s.io/client-go/informers/batch/v1beta1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + covev1client "k8s.io/client-go/kubernetes/typed/core/v1" + batchv1listers "k8s.io/client-go/listers/batch/v1" + batchv1beta1listers "k8s.io/client-go/listers/batch/v1beta1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/component-base/metrics/prometheus/ratelimiter" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller" +) + +var ( + nextScheduleDelta = 100 * time.Millisecond +) + +// ControllerV2 is a controller for CronJobs. +// Refactored Cronjob controller that uses DelayingQueue and informers +type ControllerV2 struct { + queue workqueue.RateLimitingInterface + recorder record.EventRecorder + + jobControl jobControlInterface + cronJobControl cjControlInterface + + jobLister batchv1listers.JobLister + cronJobLister batchv1beta1listers.CronJobLister + + jobListerSynced cache.InformerSynced + cronJobListerSynced cache.InformerSynced + + // now is a function that returns current time, done to facilitate unit tests + now func() time.Time +} + +// NewControllerV2 creates and initializes a new Controller. +func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1beta1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(0) + eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + + if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { + if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { + return nil, err + } + } + + jm := &ControllerV2{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"), + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}), + + jobControl: realJobControl{KubeClient: kubeClient}, + cronJobControl: &realCJControl{KubeClient: kubeClient}, + + jobLister: jobInformer.Lister(), + cronJobLister: cronJobsInformer.Lister(), + + jobListerSynced: jobInformer.Informer().HasSynced, + cronJobListerSynced: cronJobsInformer.Informer().HasSynced, + now: time.Now, + } + + jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: jm.addJob, + UpdateFunc: jm.updateJob, + DeleteFunc: jm.deleteJob, + }) + + cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + jm.enqueueController(obj) + }, + UpdateFunc: jm.updateCronJob, + DeleteFunc: func(obj interface{}) { + jm.enqueueController(obj) + }, + }) + + return jm, nil +} + +// Run starts the main goroutine responsible for watching and syncing jobs. +func (jm *ControllerV2) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer jm.queue.ShutDown() + + klog.Infof("Starting cronjob controller v2") + defer klog.Infof("Shutting down cronjob controller v2") + + if !cache.WaitForNamedCacheSync("cronjob", stopCh, jm.jobListerSynced, jm.cronJobListerSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(jm.worker, time.Second, stopCh) + } + + <-stopCh +} + +func (jm *ControllerV2) worker() { + for jm.processNextWorkItem() { + } +} + +func (jm *ControllerV2) processNextWorkItem() bool { + key, quit := jm.queue.Get() + if quit { + return false + } + defer jm.queue.Done(key) + + requeueAfter, err := jm.sync(key.(string)) + switch { + case err != nil: + utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err)) + jm.queue.AddRateLimited(key) + case requeueAfter != nil: + jm.queue.Forget(key) + jm.queue.AddAfter(key, *requeueAfter) + } + return true +} + +func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) { + ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey) + if err != nil { + return nil, err + } + + cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name) + switch { + case errors.IsNotFound(err): + // may be cronjob is deleted, dont need to requeue this key + klog.V(4).InfoS("cronjob not found, may be it is deleted", "cronjob", klog.KRef(ns, name), "err", err) + return nil, nil + case err != nil: + // for other transient apiserver error requeue with exponential backoff + return nil, err + } + + jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob) + if err != nil { + return nil, err + } + + cronJob, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled) + if err != nil { + klog.V(2).InfoS("error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) + return nil, err + } + + err = jm.cleanupFinishedJobs(cronJob, jobsToBeReconciled) + if err != nil { + klog.V(2).InfoS("error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err) + return nil, err + } + + if requeueAfter != nil { + klog.V(4).InfoS("re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter) + return requeueAfter, nil + } + // this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format + return nil, nil +} + +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the correct Kind. +func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batchv1beta1.CronJob { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + cronJob, err := jm.cronJobLister.CronJobs(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if cronJob.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return cronJob +} + +func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]batchv1.Job, error) { + var jobSelector labels.Selector + if len(cronJob.Spec.JobTemplate.Labels) == 0 { + jobSelector = labels.Everything() + } else { + jobSelector = labels.Set(cronJob.Spec.JobTemplate.Labels).AsSelector() + } + jobList, err := jm.jobLister.Jobs(cronJob.Namespace).List(jobSelector) + if err != nil { + return nil, err + } + + jobsToBeReconciled := []batchv1.Job{} + + for _, job := range jobList { + // If it has a ControllerRef, that's all that matters. + if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name { + // this job is needs to be reconciled + jobsToBeReconciled = append(jobsToBeReconciled, *job) + } + } + return jobsToBeReconciled, nil +} + +// When a job is created, enqueue the controller that manages it and update it's expectations. +func (jm *ControllerV2) addJob(obj interface{}) { + job := obj.(*batchv1.Job) + if job.DeletionTimestamp != nil { + // on a restart of the controller, it's possible a new job shows up in a state that + // is already pending deletion. Prevent the job from being a creation observation. + jm.deleteJob(job) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := metav1.GetControllerOf(job); controllerRef != nil { + cronJob := jm.resolveControllerRef(job.Namespace, controllerRef) + if cronJob == nil { + return + } + jm.enqueueController(cronJob) + return + } +} + +// updateJob figures out what CronJob(s) manage a Job when the Job +// is updated and wake them up. If the anything of the Job have changed, we need to +// awaken both the old and new CronJob. old and cur must be *batchv1.Job +// types. +func (jm *ControllerV2) updateJob(old, cur interface{}) { + curJob := cur.(*batchv1.Job) + oldJob := old.(*batchv1.Job) + if curJob.ResourceVersion == oldJob.ResourceVersion { + // Periodic resync will send update events for all known jobs. + // Two different versions of the same jobs will always have different RVs. + return + } + + curControllerRef := metav1.GetControllerOf(curJob) + oldControllerRef := metav1.GetControllerOf(oldJob) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if cronJob := jm.resolveControllerRef(oldJob.Namespace, oldControllerRef); cronJob != nil { + jm.enqueueController(cronJob) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + cronJob := jm.resolveControllerRef(curJob.Namespace, curControllerRef) + if cronJob == nil { + return + } + jm.enqueueController(cronJob) + return + } +} + +func (jm *ControllerV2) deleteJob(obj interface{}) { + job, ok := obj.(*batchv1.Job) + + // When a delete is dropped, the relist will notice a job in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + job, ok = tombstone.Obj.(*batchv1.Job) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj)) + return + } + } + + controllerRef := metav1.GetControllerOf(job) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + cronJob := jm.resolveControllerRef(job.Namespace, controllerRef) + if cronJob == nil { + return + } + jm.enqueueController(cronJob) +} + +func (jm *ControllerV2) enqueueController(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + + jm.queue.Add(key) +} + +func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + + jm.queue.AddAfter(key, t) +} + +// updateCronJob re-queues the CronJob for next scheduled time if there is a +// change in spec.schedule otherwise it re-queues it now +func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { + oldCJ, okOld := old.(*batchv1beta1.CronJob) + newCJ, okNew := curr.(*batchv1beta1.CronJob) + + if !okOld || !okNew { + // typecasting of one failed, handle this better, may be log entry + return + } + // if the change in schedule results in next requeue having to be sooner than it already was, + // it will be handled here by the queue. If the next requeue is further than previous schedule, + // the sync loop will essentially be a no-op for the already queued key with old schedule. + if oldCJ.Spec.Schedule != newCJ.Spec.Schedule { + // schedule changed, change the requeue time + sched, err := cron.ParseStandard(newCJ.Spec.Schedule) + if err != nil { + // this is likely a user error in defining the spec value + // we should log the error and not reconcile this cronjob until an update to spec + klog.V(2).InfoS("unparseable schedule for cronjob", "cronjob", klog.KRef(newCJ.GetNamespace(), newCJ.GetName()), "schedule", newCJ.Spec.Schedule, "err", err) + jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule", "unparseable schedule for cronjob: %s", newCJ.Spec.Schedule) + return + } + now := jm.now() + t := nextScheduledTimeDuration(sched, now) + + jm.enqueueControllerAfter(curr, *t) + return + } + + // other parameters changed, requeue this now and if this gets triggered + // within deadline, sync loop will work on the CJ otherwise updates will be handled + // during the next schedule + // TODO: need to handle the change of spec.JobTemplate.metadata.labels explicitly + // to cleanup jobs with old labels + jm.enqueueController(curr) +} + +// syncCronJob reconciles a CronJob with a list of any Jobs that it created. +// All known jobs created by "cj" should be included in "js". +// The current time is passed in to facilitate testing. +// It returns a copy of the CronJob that is to be used by other functions +// that mutates the object +func (jm *ControllerV2) syncCronJob( + cj *batchv1beta1.CronJob, + js []batchv1.Job) (*batchv1beta1.CronJob, *time.Duration, error) { + + cj = cj.DeepCopy() + now := jm.now() + + childrenJobs := make(map[types.UID]bool) + for _, j := range js { + childrenJobs[j.ObjectMeta.UID] = true + found := inActiveList(*cj, j.ObjectMeta.UID) + if !found && !IsJobFinished(&j) { + jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) + // We found an unfinished job that has us as the parent, but it is not in our Active list. + // This could happen if we crashed right after creating the Job and before updating the status, + // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created + // a job that they wanted us to adopt. + } else if found && IsJobFinished(&j) { + _, status := getFinishedStatus(&j) + deleteFromActiveList(cj, j.ObjectMeta.UID) + jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) + } + } + + // Remove any job reference from the active list if the corresponding job does not exist any more. + // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching + // job running. + for _, j := range cj.Status.Active { + _, found := childrenJobs[j.UID] + if found { + continue + } + // Explicitly try to get the job from api-server to avoid a slow watch not able to update + // the job lister on time, giving an unwanted miss + _, err := jm.jobControl.GetJob(j.Namespace, j.Name) + switch { + case errors.IsNotFound(err): + // The job is actually missing, delete from active list and schedule a new one if within + // deadline + jm.recorder.Eventf(cj, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) + deleteFromActiveList(cj, j.UID) + case err != nil: + return cj, nil, err + } + // the job is missing in the lister but found in api-server + } + + updatedCJ, err := jm.cronJobControl.UpdateStatus(cj) + if err != nil { + klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) + return cj, nil, err + } + *cj = *updatedCJ + + if cj.DeletionTimestamp != nil { + // The CronJob is being deleted. + // Don't do anything other than updating status. + return cj, nil, nil + } + + if cj.Spec.Suspend != nil && *cj.Spec.Suspend { + klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + return cj, nil, nil + } + + sched, err := cron.ParseStandard(cj.Spec.Schedule) + if err != nil { + // this is likely a user error in defining the spec value + // we should log the error and not reconcile this cronjob until an update to spec + klog.V(2).InfoS("unparseable schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err) + jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %s : %s", cj.Spec.Schedule, err) + return cj, nil, nil + } + times := getUnmetScheduleTimes(*cj, now, sched, jm.recorder) + if len(times) == 0 { + // no unmet start time, return cj,. + // The only time this should happen is if queue is filled after restart. + // Otherwise, the queue is always suppose to trigger sync function at the time of + // the scheduled time, that will give atleast 1 unmet time schedule + klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + t := nextScheduledTimeDuration(sched, now) + return cj, t, nil + } + + scheduledTime := times[len(times)-1] + tooLate := false + if cj.Spec.StartingDeadlineSeconds != nil { + tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now) + } + if tooLate { + klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + jm.recorder.Eventf(cj, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z)) + + // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing + // the miss every cycle. In order to avoid sending multiple events, and to avoid processing + // the cj again and again, we could set a Status.LastMissedTime when we notice a miss. + // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp, + // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate + // and event the next time we process it, and also so the user looking at the status + // can see easily that there was a missed execution. + t := nextScheduledTimeDuration(sched, now) + return cj, t, nil + } + if isJobInActiveList(&batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: getJobName(cj, scheduledTime), + Namespace: cj.Namespace, + }}, cj.Status.Active) || cj.Status.LastScheduleTime.Equal(&metav1.Time{Time: scheduledTime}) { + klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", scheduledTime) + t := nextScheduledTimeDuration(sched, now) + return cj, t, nil + } + if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 { + // Regardless which source of information we use for the set of active jobs, + // there is some risk that we won't see an active job when there is one. + // (because we haven't seen the status update to the SJ or the created pod). + // So it is theoretically possible to have concurrency with Forbid. + // As long the as the invocations are "far enough apart in time", this usually won't happen. + // + // TODO: for Forbid, we could use the same name for every execution, as a lock. + // With replace, we could use a name that is deterministic per execution time. + // But that would mean that you could not inspect prior successes or failures of Forbid jobs. + klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + jm.recorder.Eventf(cj, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid") + t := nextScheduledTimeDuration(sched, now) + return cj, t, nil + } + if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { + for _, j := range cj.Status.Active { + klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name)) + + job, err := jm.jobControl.GetJob(j.Namespace, j.Name) + if err != nil { + jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err) + return cj, nil, err + } + if !deleteJob(cj, job, jm.jobControl, jm.recorder) { + return cj, nil, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name) + } + } + } + + jobReq, err := getJobFromTemplate2(cj, scheduledTime) + if err != nil { + klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + return cj, nil, err + } + jobResp, err := jm.jobControl.CreateJob(cj.Namespace, jobReq) + switch { + case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause): + case errors.IsAlreadyExists(err): + klog.InfoS("Job already exists", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName())) + case err != nil: + // default error handling + jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) + return cj, nil, err + } + klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) + + // ------------------------------------------------------------------ // + + // If this process restarts at this point (after posting a job, but + // before updating the status), then we might try to start the job on + // the next time. Actually, if we re-list the SJs and Jobs on the next + // iteration of syncAll, we might not see our own status update, and + // then post one again. So, we need to use the job name as a lock to + // prevent us from making the job twice (name the job with hash of its + // scheduled time). + + // Add the just-started job to the status list. + jobRef, err := getRef(jobResp) + if err != nil { + klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "err", err) + return cj, nil, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cj.GetNamespace(), cj.GetName())) + } + cj.Status.Active = append(cj.Status.Active, *jobRef) + cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} + if _, err := jm.cronJobControl.UpdateStatus(cj); err != nil { + klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) + return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err) + } + + t := nextScheduledTimeDuration(sched, now) + return cj, t, nil +} + +func getJobName(cj *batchv1beta1.CronJob, scheduledTime time.Time) string { + return fmt.Sprintf("%s-%d", cj.Name, getTimeHashInMinutes(scheduledTime)) +} + +// nextScheduledTimeDuration returns the time duration to requeue based on +// the schedule and current time. It adds a 100ms padding to the next requeue to account +// for Network Time Protocol(NTP) time skews. If the time drifts are adjusted which in most +// realistic cases would be around 100s, scheduled cron will still be executed without missing +// the schedule. +func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duration { + t := sched.Next(now).Add(nextScheduleDelta).Sub(now) + return &t +} + +// cleanupFinishedJobs cleanups finished jobs created by a CronJob +func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batchv1.Job) error { + // If neither limits are active, there is no need to do anything. + if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { + return nil + } + + failedJobs := []batchv1.Job{} + successfulJobs := []batchv1.Job{} + + for _, job := range js { + isFinished, finishedStatus := getFinishedStatus(&job) + if isFinished && finishedStatus == batchv1.JobComplete { + successfulJobs = append(successfulJobs, job) + } else if isFinished && finishedStatus == batchv1.JobFailed { + failedJobs = append(failedJobs, job) + } + } + + if cj.Spec.SuccessfulJobsHistoryLimit != nil { + removeOldestJobs(cj, + successfulJobs, + jm.jobControl, + *cj.Spec.SuccessfulJobsHistoryLimit, + jm.recorder) + } + + if cj.Spec.FailedJobsHistoryLimit != nil { + removeOldestJobs(cj, + failedJobs, + jm.jobControl, + *cj.Spec.FailedJobsHistoryLimit, + jm.recorder) + } + + // Update the CronJob, in case jobs were removed from the list. + _, err := jm.cronJobControl.UpdateStatus(cj) + return err +} + +// isJobInActiveList take a job and checks if activeJobs has a job with the same +// name and namespace. +func isJobInActiveList(job *batchv1.Job, activeJobs []corev1.ObjectReference) bool { + for _, j := range activeJobs { + if j.Name == job.Name && j.Namespace == job.Namespace { + return true + } + } + return false +} diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go new file mode 100644 index 00000000000..4ba9e68293b --- /dev/null +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -0,0 +1,545 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjob + +import ( + "fmt" + "github.com/robfig/cron" + "k8s.io/apimachinery/pkg/labels" + "reflect" + "strings" + "sync" + "testing" + "time" + + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + batchv1listers "k8s.io/client-go/listers/batch/v1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + _ "k8s.io/kubernetes/pkg/apis/batch/install" + _ "k8s.io/kubernetes/pkg/apis/core/install" +) + +func justASecondBeforeTheHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:59Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func Test_syncOne2(t *testing.T) { + // Check expectations on deadline parameters + if shortDead/60/60 >= 1 { + t.Errorf("shortDead should be less than one hour") + } + + if mediumDead/60/60 < 1 || mediumDead/60/60 >= 24 { + t.Errorf("mediumDead should be between one hour and one day") + } + + if longDead/60/60/24 < 10 { + t.Errorf("longDead should be at least ten days") + } + + testCases := map[string]struct { + // cj spec + concurrencyPolicy batchv1beta1.ConcurrencyPolicy + suspend bool + schedule string + deadline int64 + + // cj status + ranPreviously bool + stillActive bool + + jobCreationTime time.Time + + // environment + now time.Time + + // expectations + expectCreate bool + expectDelete bool + expectActive int + expectedWarnings int + expectErr bool + expectRequeueAfter bool + jobStillNotFoundInLister bool + }{ + "never ran, not valid schedule, A": {A, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F, F}, + "never ran, not valid schedule, F": {f, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F, F}, + "never ran, not valid schedule, R": {f, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F, F}, + "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F}, + "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F}, + "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F}, + "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, F, F}, + "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, T, F}, + "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + + "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F}, + "prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F}, + "prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F}, + "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, F, F}, + "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, T, F}, + "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F}, + + "still active, not time, A": {A, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T, F}, + "still active, not time, F": {f, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T, F}, + "still active, not time, R": {R, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T, F}, + "still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, F, 2, 0, F, T, F}, + "still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, T, F}, + "still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, T, 1, 0, F, T, F}, + "still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, F, F}, + "still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, T, F}, + "still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, F, 2, 0, F, T, F}, + + // Controller should fail to schedule these, as there are too many missed starting times + // and either no deadline or a too long deadline. + "prev ran but done, long overdue, not past deadline, A": {A, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F}, + "prev ran but done, long overdue, not past deadline, R": {R, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F}, + "prev ran but done, long overdue, not past deadline, F": {f, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F}, + "prev ran but done, long overdue, no deadline, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F}, + "prev ran but done, long overdue, no deadline, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F}, + "prev ran but done, long overdue, no deadline, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F}, + + "prev ran but done, long overdue, past medium deadline, A": {A, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F}, + "prev ran but done, long overdue, past short deadline, A": {A, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F}, + + "prev ran but done, long overdue, past medium deadline, R": {R, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F}, + "prev ran but done, long overdue, past short deadline, R": {R, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F}, + + "prev ran but done, long overdue, past medium deadline, F": {f, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F}, + "prev ran but done, long overdue, past short deadline, F": {f, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F}, + + // Tests for time skews + "this ran but done, time drifted back, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F}, + + // Tests for slow job lister + "this started but went missing, not past deadline, A": {A, F, onTheHour, longDead, T, T, topOfTheHour().Add(time.Millisecond * 100), justAfterTheHour().Add(time.Millisecond * 100), F, F, 1, 0, F, T, T}, + "this started but went missing, not past deadline, f": {f, F, onTheHour, longDead, T, T, topOfTheHour().Add(time.Millisecond * 100), justAfterTheHour().Add(time.Millisecond * 100), F, F, 1, 0, F, T, T}, + "this started but went missing, not past deadline, R": {R, F, onTheHour, longDead, T, T, topOfTheHour().Add(time.Millisecond * 100), justAfterTheHour().Add(time.Millisecond * 100), F, F, 1, 0, F, T, T}, + + // TODO: alpatel add tests for slow cronjob lister + } + for name, tc := range testCases { + name := name + tc := tc + t.Run(name, func(t *testing.T) { + if name == "this ran but done, time drifted back, F" { + println("hello") + } + cj := cronJob() + cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy + cj.Spec.Suspend = &tc.suspend + cj.Spec.Schedule = tc.schedule + if tc.deadline != noDead { + cj.Spec.StartingDeadlineSeconds = &tc.deadline + } + + var ( + job *batchv1.Job + err error + ) + js := []batchv1.Job{} + if tc.ranPreviously { + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()} + cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()} + job, err = getJobFromTemplate2(&cj, tc.jobCreationTime) + if err != nil { + t.Fatalf("%s: unexpected error creating a job from template: %v", name, err) + } + job.UID = "1234" + job.Namespace = cj.Namespace + if tc.stillActive { + ref, err := getRef(job) + if err != nil { + t.Fatalf("%s: unexpected error getting the job object reference: %v", name, err) + } + cj.Status.Active = []v1.ObjectReference{*ref} + if !tc.jobStillNotFoundInLister { + js = append(js, *job) + } + } + } else { + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} + if tc.stillActive { + t.Errorf("%s: test setup error: this case makes no sense", name) + } + } + + jc := &fakeJobControl{Job: job} + cjc := &fakeCJControl{} + recorder := record.NewFakeRecorder(10) + + jm := ControllerV2{ + jobControl: jc, + cronJobControl: cjc, + recorder: recorder, + now: func() time.Time { + return tc.now + }, + } + cjCopy, requeueAfter, err := jm.syncCronJob(&cj, js) + if tc.expectErr && err == nil { + t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter) + } + if tc.expectRequeueAfter { + sched, err := cron.ParseStandard(tc.schedule) + if err != nil { + t.Errorf("%s: test setup error: the schedule %s is unparseable: %#v", name, tc.schedule, err) + } + expectedRequeueAfter := nextScheduledTimeDuration(sched, tc.now) + if !reflect.DeepEqual(requeueAfter, expectedRequeueAfter) { + t.Errorf("%s: expected requeueAfter: %+v, got requeueAfter time: %+v", name, expectedRequeueAfter, requeueAfter) + } + } + expectedCreates := 0 + if tc.expectCreate { + expectedCreates = 1 + } + if len(jc.Jobs) != expectedCreates { + t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs)) + } + for i := range jc.Jobs { + job := &jc.Jobs[i] + controllerRef := metav1.GetControllerOf(job) + if controllerRef == nil { + t.Errorf("%s: expected job to have ControllerRef: %#v", name, job) + } else { + if got, want := controllerRef.APIVersion, "batch/v1beta1"; got != want { + t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want) + } + if got, want := controllerRef.Kind, "CronJob"; got != want { + t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want) + } + if got, want := controllerRef.Name, cj.Name; got != want { + t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want) + } + if got, want := controllerRef.UID, cj.UID; got != want { + t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want) + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("%s: controllerRef.Controller is not set to true", name) + } + } + } + + expectedDeletes := 0 + if tc.expectDelete { + expectedDeletes = 1 + } + if len(jc.DeleteJobName) != expectedDeletes { + t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName)) + } + + // Status update happens once when ranging through job list, and another one if create jobs. + expectUpdates := 1 + expectedEvents := 0 + if tc.expectCreate { + expectedEvents++ + expectUpdates++ + } + if tc.expectDelete { + expectedEvents++ + } + if name == "still active, is time, F" { + // this is the only test case where we would raise an event for not scheduling + expectedEvents++ + } + expectedEvents += tc.expectedWarnings + + if len(recorder.Events) != expectedEvents { + t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events)) + } + + numWarnings := 0 + for i := 1; i <= len(recorder.Events); i++ { + e := <-recorder.Events + if strings.HasPrefix(e, v1.EventTypeWarning) { + numWarnings++ + } + } + if numWarnings != tc.expectedWarnings { + t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings) + } + + if len(cjc.Updates) == expectUpdates && tc.expectActive != len(cjc.Updates[expectUpdates-1].Status.Active) { + t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(cjc.Updates[expectUpdates-1].Status.Active)) + } + + if &cj == cjCopy { + t.Errorf("syncCronJob is not creating a copy of the original cronjob") + } + }) + } + +} + +// this test will take around 61 seconds to complete +func TestController2_updateCronJob(t *testing.T) { + cjc := &fakeCJControl{} + jc := &fakeJobControl{} + type fields struct { + queue workqueue.RateLimitingInterface + recorder record.EventRecorder + jobControl jobControlInterface + cronJobControl cjControlInterface + } + type args struct { + oldJobTemplate *batchv1beta1.JobTemplateSpec + newJobTemplate *batchv1beta1.JobTemplateSpec + oldJobSchedule string + newJobSchedule string + } + tests := []struct { + name string + fields fields + args args + deltaTimeForQueue time.Duration + roundOffTimeDuration time.Duration + }{ + { + name: "spec.template changed", + fields: fields{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-update-cronjob"), + recorder: record.NewFakeRecorder(10), + jobControl: jc, + cronJobControl: cjc, + }, + args: args{ + oldJobTemplate: &batchv1beta1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: jobSpec(), + }, + newJobTemplate: &batchv1beta1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"a": "foo"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: jobSpec(), + }, + }, + deltaTimeForQueue: 0 * time.Second, + roundOffTimeDuration: 500*time.Millisecond + nextScheduleDelta, + }, + { + name: "spec.schedule changed", + fields: fields{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-update-cronjob"), + recorder: record.NewFakeRecorder(10), + jobControl: jc, + cronJobControl: cjc, + }, + args: args{ + oldJobSchedule: "30 * * * *", + newJobSchedule: "1 * * * *", + }, + deltaTimeForQueue: 61*time.Second + nextScheduleDelta, + roundOffTimeDuration: 750 * time.Millisecond, + }, + // TODO: Add more test cases for updating scheduling. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cj := cronJob() + newCj := cronJob() + if tt.args.oldJobTemplate != nil { + cj.Spec.JobTemplate = *tt.args.oldJobTemplate + } + if tt.args.newJobTemplate != nil { + newCj.Spec.JobTemplate = *tt.args.newJobTemplate + } + if tt.args.oldJobSchedule != "" { + cj.Spec.Schedule = tt.args.oldJobSchedule + } + if tt.args.newJobSchedule != "" { + newCj.Spec.Schedule = tt.args.newJobSchedule + } + jm := &ControllerV2{ + queue: tt.fields.queue, + recorder: tt.fields.recorder, + jobControl: tt.fields.jobControl, + cronJobControl: tt.fields.cronJobControl, + } + jm.now = justASecondBeforeTheHour + now := time.Now() + then := time.Now() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + now = time.Now() + jm.queue.Get() + then = time.Now() + wg.Done() + return + }() + jm.updateCronJob(&cj, &newCj) + wg.Wait() + d := then.Sub(now) + if d.Round(tt.roundOffTimeDuration).Seconds() != tt.deltaTimeForQueue.Round(tt.roundOffTimeDuration).Seconds() { + t.Errorf("Expected %#v got %#v", tt.deltaTimeForQueue.Round(tt.roundOffTimeDuration).String(), d.Round(tt.roundOffTimeDuration).String()) + } + }) + } +} + +type FakeNamespacedJobLister struct { + jobs []*batchv1.Job + namespace string +} + +func (f *FakeNamespacedJobLister) Get(name string) (*batchv1.Job, error) { + for _, j := range f.jobs { + if j.Namespace == f.namespace && j.Namespace == name { + return j, nil + } + } + return nil, fmt.Errorf("Not Found") +} + +func (f *FakeNamespacedJobLister) List(selector labels.Selector) ([]*batchv1.Job, error) { + ret := []*batchv1.Job{} + for _, j := range f.jobs { + if f.namespace != "" && f.namespace != j.Namespace { + continue + } + if selector.Matches(labels.Set(j.GetLabels())) { + ret = append(ret, j) + } + } + return ret, nil +} + +func (f *FakeNamespacedJobLister) Jobs(namespace string) batchv1listers.JobNamespaceLister { + f.namespace = namespace + return f +} + +func (f *FakeNamespacedJobLister) GetPodJobs(pod *v1.Pod) (jobs []batchv1.Job, err error) { + panic("implement me") +} + +func TestControllerV2_getJobList(t *testing.T) { + trueRef := true + type fields struct { + jobLister batchv1listers.JobLister + } + type args struct { + cronJob *batchv1beta1.CronJob + } + tests := []struct { + name string + fields fields + args args + want []batchv1.Job + wantErr bool + }{ + { + name: "test getting jobs in namespace without controller reference", + fields: fields{ + &FakeNamespacedJobLister{jobs: []*batchv1.Job{ + { + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"}, + }, + }}}, + args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, + want: []batchv1.Job{}, + }, + { + name: "test getting jobs in namespace with a controller reference", + fields: fields{ + &FakeNamespacedJobLister{jobs: []*batchv1.Job{ + { + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "fooer", + Controller: &trueRef, + }, + }}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"}, + }, + }}}, + args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, + want: []batchv1.Job{{ + ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "fooer", + Controller: &trueRef, + }, + }}, + }}, + }, + { + name: "test getting jobs in other namespaces ", + fields: fields{ + &FakeNamespacedJobLister{jobs: []*batchv1.Job{ + { + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar-ns"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "bar-ns"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "bar-ns"}, + }, + }}}, + args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, + want: []batchv1.Job{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + jm := &ControllerV2{ + jobLister: tt.fields.jobLister, + } + got, err := jm.getJobsToBeReconciled(tt.args.cronJob) + if (err != nil) != tt.wantErr { + t.Errorf("getJobsToBeReconciled() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getJobsToBeReconciled() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/controller/cronjob/injection.go b/pkg/controller/cronjob/injection.go index aeef2805372..db7cb71b91e 100644 --- a/pkg/controller/cronjob/injection.go +++ b/pkg/controller/cronjob/injection.go @@ -35,6 +35,8 @@ import ( // created as an interface to allow testing. type cjControlInterface interface { UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) + // GetCronJob retrieves a CronJob. + GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) } // realCJControl is the default implementation of cjControlInterface. @@ -42,6 +44,10 @@ type realCJControl struct { KubeClient clientset.Interface } +func (c *realCJControl) GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) { + return c.KubeClient.BatchV1beta1().CronJobs(namespace).Get(context.TODO(), name, metav1.GetOptions{}) +} + var _ cjControlInterface = &realCJControl{} func (c *realCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) { @@ -53,6 +59,10 @@ type fakeCJControl struct { Updates []batchv1beta1.CronJob } +func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) { + panic("implement me") +} + var _ cjControlInterface = &fakeCJControl{} func (c *fakeCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) { diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index 1ffbd5927dd..29dcab3144b 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -25,9 +25,10 @@ import ( batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" - "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" ) // Utilities for dealing with Jobs and CronJobs and time. @@ -45,7 +46,9 @@ func deleteFromActiveList(cj *batchv1beta1.CronJob, uid types.UID) { if cj == nil { return } - newActive := []v1.ObjectReference{} + // TODO: @alpatel the memory footprint can may be reduced here by + // cj.Status.Active = append(cj.Status.Active[:indexToRemove], cj.Status.Active[indexToRemove:]...) + newActive := []corev1.ObjectReference{} for _, j := range cj.Status.Active { if j.UID != uid { newActive = append(newActive, j) @@ -147,6 +150,71 @@ func getRecentUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time) ([]time return starts, nil } +// getUnmetScheduleTimes gets the slice of all the missed times from the time a job +// last was scheduled to up `now`. +// +// If there are too many (>100) unstarted times, it will raise a warning and but still return +// the list of missed times. +func getUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) []time.Time { + starts := []time.Time{} + + var earliestTime time.Time + if cj.Status.LastScheduleTime != nil { + earliestTime = cj.Status.LastScheduleTime.Time + } else { + // If none found, then this is either a recently created cronJob, + // or the active/completed info was somehow lost (contract for status + // in kubernetes says it may need to be recreated), or that we have + // started a job, but have not noticed it yet (distributed systems can + // have arbitrary delays). In any case, use the creation time of the + // CronJob as last known start time. + earliestTime = cj.ObjectMeta.CreationTimestamp.Time + } + if cj.Spec.StartingDeadlineSeconds != nil { + // Controller is not going to schedule anything below this point + schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)) + + if schedulingDeadline.After(earliestTime) { + earliestTime = schedulingDeadline + } + } + if earliestTime.After(now) { + return []time.Time{} + } + + // t := schedule.Next(earliestTime) + // t1 := schedule.Next(t) + // delta := t1 - t + // missed := now - earliestTime/delta + // last missed = earliestTime + delta * (missed - 1) + // TODO: @alpatel, convert the following for loop into above logic and add test cases + for t := schedule.Next(earliestTime); !t.After(now); t = schedule.Next(t) { + starts = append(starts, t) + } + if len(starts) > 100 { + // An object might miss several starts. For example, if + // controller gets wedged on friday at 5:01pm when everyone has + // gone home, and someone comes in on tuesday AM and discovers + // the problem and restarts the controller, then all the hourly + // jobs, more than 80 of them for one hourly cronJob, should + // all start running with no further intervention (if the cronJob + // allows concurrency and late starts). + // + // However, if there is a bug somewhere, or incorrect clock + // on controller's server or apiservers (for setting creationTimestamp) + // then there could be so many missed start times (it could be off + // by decades or more), that it would eat up all the CPU and memory + // of this controller. In that case, we want to not try to list + // all the missed start times. + // + // I've somewhat arbitrarily picked 100, as more than 80, + // but less than "lots". + recorder.Eventf(&cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", len(starts)) + klog.InfoS("too many missed times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "missed times", len(starts)) + } + return starts +} + // getJobFromTemplate makes a Job from a CronJob func getJobFromTemplate(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) { labels := copyLabels(&cj.Spec.JobTemplate) @@ -171,9 +239,35 @@ func getTimeHash(scheduledTime time.Time) int64 { return scheduledTime.Unix() } +// getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from +// epoch time and concatenates that to the job name, because the cronjob_controller v2 has the lowest +// granularity of 1 minute for scheduling job. +func getJobFromTemplate2(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) { + labels := copyLabels(&cj.Spec.JobTemplate) + annotations := copyAnnotations(&cj.Spec.JobTemplate) + // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice + name := getJobName(cj, scheduledTime) + + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Annotations: annotations, + Name: name, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)}, + }, + } + cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec) + return job, nil +} + +// getTimeHash returns Unix Epoch Time in minutes +func getTimeHashInMinutes(scheduledTime time.Time) int64 { + return scheduledTime.Unix() / 60 +} + func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) { for _, c := range j.Status.Conditions { - if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue { + if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { return true, c.Type } } diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index 8df0f3fe223..fb88926a21a 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -23,11 +23,13 @@ import ( "testing" "time" + cron "github.com/robfig/cron" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" utilpointer "k8s.io/utils/pointer" ) @@ -89,6 +91,64 @@ func TestGetJobFromTemplate(t *testing.T) { } } +func TestGetJobFromTemplate2(t *testing.T) { + // getJobFromTemplate2() needs to take the job template and copy the labels and annotations + // and other fields, and add a created-by reference. + + var one int64 = 1 + var no bool + + cj := batchv1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mycronjob", + Namespace: "snazzycats", + UID: types.UID("1a2b3c"), + SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob", + }, + Spec: batchv1beta1.CronJobSpec{ + Schedule: "* * * * ?", + ConcurrencyPolicy: batchv1beta1.AllowConcurrent, + JobTemplate: batchv1beta1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: batchv1.JobSpec{ + ActiveDeadlineSeconds: &one, + ManualSelector: &no, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + }, + }, + } + + var job *batchv1.Job + job, err := getJobFromTemplate2(&cj, time.Time{}) + if err != nil { + t.Errorf("Did not expect error: %s", err) + } + if !strings.HasPrefix(job.ObjectMeta.Name, "mycronjob-") { + t.Errorf("Wrong Name") + } + if len(job.ObjectMeta.Labels) != 1 { + t.Errorf("Wrong number of labels") + } + if len(job.ObjectMeta.Annotations) != 1 { + t.Errorf("Wrong number of annotations") + } +} + func TestGetParentUIDFromJob(t *testing.T) { j := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -242,6 +302,137 @@ func TestGroupJobsByParent(t *testing.T) { } } +func TestGetLatestUnmetScheduleTimes(t *testing.T) { + // schedule is hourly on the hour + schedule := "0 * * * ?" + + PraseSchedule := func(schedule string) cron.Schedule { + sched, err := cron.ParseStandard(schedule) + if err != nil { + t.Errorf("Error parsing schedule: %#v", err) + return nil + } + return sched + } + recorder := record.NewFakeRecorder(50) + // T1 is a scheduled start time of that schedule + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") + if err != nil { + t.Errorf("test setup error: %v", err) + } + // T2 is a scheduled start time of that schedule after T1 + T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z") + if err != nil { + t.Errorf("test setup error: %v", err) + } + + cj := batchv1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mycronjob", + Namespace: metav1.NamespaceDefault, + UID: types.UID("1a2b3c"), + }, + Spec: batchv1beta1.CronJobSpec{ + Schedule: schedule, + ConcurrencyPolicy: batchv1beta1.AllowConcurrent, + JobTemplate: batchv1beta1.JobTemplateSpec{}, + }, + } + { + // Case 1: no known start times, and none needed yet. + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Current time is more than creation time, but less than T1. + now := T1.Add(-7 * time.Minute) + times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + if len(times) != 0 { + t.Errorf("expected no start times, got: %v", times) + } + } + { + // Case 2: no known start times, and one needed. + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Current time is after T1 + now := T1.Add(2 * time.Second) + times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + if len(times) != 1 { + t.Errorf("expected 1 start time, got: %v", times) + } else if !times[0].Equal(T1) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + } + { + // Case 3: known LastScheduleTime, no start needed. + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Status shows a start at the expected time. + cj.Status.LastScheduleTime = &metav1.Time{Time: T1} + // Current time is after T1 + now := T1.Add(2 * time.Minute) + times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + if len(times) != 0 { + t.Errorf("expected 0 start times, got: %v", times) + } + } + { + // Case 4: known LastScheduleTime, a start needed + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Status shows a start at the expected time. + cj.Status.LastScheduleTime = &metav1.Time{Time: T1} + // Current time is after T1 and after T2 + now := T2.Add(5 * time.Minute) + times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + if len(times) != 1 { + t.Errorf("expected 1 start times, got: %v", times) + } else if !times[0].Equal(T2) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + } + { + // Case 5: known LastScheduleTime, two starts needed + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} + cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} + // Current time is after T1 and after T2 + now := T2.Add(5 * time.Minute) + times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + if len(times) != 2 { + t.Errorf("expected 2 start times, got: %v", times) + } else { + if !times[0].Equal(T1) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + if !times[1].Equal(T2) { + t.Errorf("expected: %v, got: %v", T2, times[1]) + } + } + } + { + // Case 6: now is way way ahead of last start time, and there is no deadline. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} + cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} + now := T2.Add(10 * 24 * time.Hour) + times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + if len(times) == 0 { + t.Errorf("expected more than 0 missed times") + } + } + { + // Case 7: now is way way ahead of last start time, but there is a short deadline. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} + cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} + now := T2.Add(10 * 24 * time.Hour) + // Deadline is short + deadline := int64(2 * 60 * 60) + cj.Spec.StartingDeadlineSeconds = &deadline + times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + if len(times) == 0 { + t.Errorf("expected more than 0 missed times") + } + } +} + func TestGetRecentUnmetScheduleTimes(t *testing.T) { // schedule is hourly on the hour schedule := "0 * * * ?" diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 4dc3f09ee73..24569684005 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -560,6 +560,16 @@ const ( // Enable all logic related to the PodDisruptionBudget API object in policy PodDisruptionBudget featuregate.Feature = "PodDisruptionBudget" + // owner: @alaypatel07, @soltysh + // alpha: v1.20 + // beta: v1.21 + // + // CronJobControllerV2 controls whether the controller manager starts old cronjob + // controller or new one which is implemented with informers and delaying queue + // + // This feature is deprecated, and will be removed in v1.22. + CronJobControllerV2 featuregate.Feature = "CronJobControllerV2" + // owner: @m1093782566 // alpha: v1.17 // @@ -754,6 +764,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23 AllowInsecureBackendProxy: {Default: true, PreRelease: featuregate.Beta}, PodDisruptionBudget: {Default: true, PreRelease: featuregate.Beta}, + CronJobControllerV2: {Default: false, PreRelease: featuregate.Alpha}, ServiceTopology: {Default: false, PreRelease: featuregate.Alpha}, ServiceAppProtocol: {Default: true, PreRelease: featuregate.Beta}, ImmutableEphemeralVolumes: {Default: true, PreRelease: featuregate.Beta}, diff --git a/test/e2e/apps/cronjob.go b/test/e2e/apps/cronjob.go index 228bf6944ff..a5fea77ce4c 100644 --- a/test/e2e/apps/cronjob.go +++ b/test/e2e/apps/cronjob.go @@ -170,6 +170,32 @@ var _ = SIGDescribe("CronJob", func() { framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name) }) + ginkgo.It("should be able to schedule after more than 100 missed schedule", func() { + ginkgo.By("Creating a cronjob") + cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1beta1.ForbidConcurrent, + sleepCommand, nil, nil) + creationTime := time.Now().Add(-99 * 24 * time.Hour) + lastScheduleTime := creationTime.Add(-1 * 24 * time.Hour) + cronJob.CreationTimestamp = metav1.Time{Time: creationTime} + cronJob.Status.LastScheduleTime = &metav1.Time{Time: lastScheduleTime} + cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) + framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name) + + ginkgo.By("Ensuring one job is running") + err = waitForActiveJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, 1) + framework.ExpectNoError(err, "Failed to wait for active jobs in CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name) + + ginkgo.By("Ensuring at least one running jobs exists by listing jobs explicitly") + jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name) + activeJobs, _ := filterActiveJobs(jobs) + gomega.Expect(len(activeJobs)).To(gomega.BeNumerically(">=", 1)) + + ginkgo.By("Removing cronjob") + err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name) + framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name) + }) + // shouldn't give us unexpected warnings ginkgo.It("should not emit unexpected warnings", func() { ginkgo.By("Creating a cronjob")