add cronjob_controllerv2.go

This commit is contained in:
Alay Patel 2020-10-11 02:49:11 -04:00
parent 1d4c0ad6f3
commit 8d7dd4415e
12 changed files with 1616 additions and 4 deletions

View File

@ -22,12 +22,13 @@ package app
import (
"fmt"
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job"
kubefeatures "k8s.io/kubernetes/pkg/features"
)
func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
@ -46,6 +47,17 @@ func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
return nil, false, nil
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) {
cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(),
ctx.InformerFactory.Batch().V1beta1().CronJobs(),
ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err)
}
go cj2c.Run(int(ctx.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Stop)
return nil, true, nil
}
cjc, err := cronjob.NewController(
ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
)

View File

@ -0,0 +1,56 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"github.com/spf13/pflag"
cronjobconfig "k8s.io/kubernetes/pkg/controller/cronjob/config"
)
// CronJobControllerOptions holds the CronJobController options.
type CronJobControllerOptions struct {
*cronjobconfig.CronJobControllerConfiguration
}
// AddFlags adds flags related to JobController for controller manager to the specified FlagSet.
func (o *CronJobControllerOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
}
// ApplyTo fills up JobController config with options.
func (o *CronJobControllerOptions) ApplyTo(cfg *cronjobconfig.CronJobControllerConfiguration) error {
if o == nil {
return nil
}
cfg.ConcurrentCronJobSyncs = o.ConcurrentCronJobSyncs
return nil
}
// Validate checks validation of CronJobControllerOptions.
func (o *CronJobControllerOptions) Validate() []error {
if o == nil {
return nil
}
errs := []error{}
return errs
}

View File

@ -72,6 +72,7 @@ type KubeControllerManagerOptions struct {
GarbageCollectorController *GarbageCollectorControllerOptions
HPAController *HPAControllerOptions
JobController *JobControllerOptions
CronJobController *CronJobControllerOptions
NamespaceController *NamespaceControllerOptions
NodeIPAMController *NodeIPAMControllerOptions
NodeLifecycleController *NodeLifecycleControllerOptions
@ -145,6 +146,9 @@ func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
JobController: &JobControllerOptions{
&componentConfig.JobController,
},
CronJobController: &CronJobControllerOptions{
&componentConfig.CronJobController,
},
NamespaceController: &NamespaceControllerOptions{
&componentConfig.NamespaceController,
},
@ -245,6 +249,7 @@ func (s *KubeControllerManagerOptions) Flags(allControllers []string, disabledBy
s.GarbageCollectorController.AddFlags(fss.FlagSet("garbagecollector controller"))
s.HPAController.AddFlags(fss.FlagSet("horizontalpodautoscaling controller"))
s.JobController.AddFlags(fss.FlagSet("job controller"))
s.CronJobController.AddFlags(fss.FlagSet("cronjob controller"))
s.NamespaceController.AddFlags(fss.FlagSet("namespace controller"))
s.NodeIPAMController.AddFlags(fss.FlagSet("nodeipam controller"))
s.NodeLifecycleController.AddFlags(fss.FlagSet("nodelifecycle controller"))
@ -310,6 +315,9 @@ func (s *KubeControllerManagerOptions) ApplyTo(c *kubecontrollerconfig.Config) e
if err := s.JobController.ApplyTo(&c.ComponentConfig.JobController); err != nil {
return err
}
if err := s.CronJobController.ApplyTo(&c.ComponentConfig.CronJobController); err != nil {
return err
}
if err := s.NamespaceController.ApplyTo(&c.ComponentConfig.NamespaceController); err != nil {
return err
}
@ -384,6 +392,7 @@ func (s *KubeControllerManagerOptions) Validate(allControllers []string, disable
errs = append(errs, s.GarbageCollectorController.Validate()...)
errs = append(errs, s.HPAController.Validate()...)
errs = append(errs, s.JobController.Validate()...)
errs = append(errs, s.CronJobController.Validate()...)
errs = append(errs, s.NamespaceController.Validate()...)
errs = append(errs, s.NodeIPAMController.Validate()...)
errs = append(errs, s.NodeLifecycleController.Validate()...)

View File

@ -38,6 +38,7 @@ import (
kubecontrollerconfig "k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config"
cronjobconfig "k8s.io/kubernetes/pkg/controller/cronjob/config"
daemonconfig "k8s.io/kubernetes/pkg/controller/daemon/config"
deploymentconfig "k8s.io/kubernetes/pkg/controller/deployment/config"
endpointconfig "k8s.io/kubernetes/pkg/controller/endpoint/config"
@ -314,6 +315,11 @@ func TestAddFlags(t *testing.T) {
ConcurrentJobSyncs: 5,
},
},
CronJobController: &CronJobControllerOptions{
&cronjobconfig.CronJobControllerConfiguration{
ConcurrentCronJobSyncs: 5,
},
},
NamespaceController: &NamespaceControllerOptions{
&namespaceconfig.NamespaceControllerConfiguration{
NamespaceSyncPeriod: metav1.Duration{Duration: 10 * time.Minute},
@ -566,6 +572,9 @@ func TestApplyTo(t *testing.T) {
JobController: jobconfig.JobControllerConfiguration{
ConcurrentJobSyncs: 5,
},
CronJobController: cronjobconfig.CronJobControllerConfiguration{
ConcurrentCronJobSyncs: 5,
},
NamespaceController: namespaceconfig.NamespaceControllerConfiguration{
NamespaceSyncPeriod: metav1.Duration{Duration: 10 * time.Minute},
ConcurrentNamespaceSyncs: 20,

View File

@ -50,6 +50,7 @@ pkg/controller/certificates
pkg/controller/certificates/signer
pkg/controller/certificates/signer/config/v1alpha1
pkg/controller/clusterroleaggregation
pkg/controller/cronjob/config/v1alpha1
pkg/controller/daemon
pkg/controller/daemon/config/v1alpha1
pkg/controller/deployment

View File

@ -0,0 +1,648 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cronjob
import (
"fmt"
"reflect"
"time"
"github.com/robfig/cron"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
batchv1informers "k8s.io/client-go/informers/batch/v1"
batchv1beta1informers "k8s.io/client-go/informers/batch/v1beta1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
covev1client "k8s.io/client-go/kubernetes/typed/core/v1"
batchv1listers "k8s.io/client-go/listers/batch/v1"
batchv1beta1listers "k8s.io/client-go/listers/batch/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
)
var (
nextScheduleDelta = 100 * time.Millisecond
)
// ControllerV2 is a controller for CronJobs.
// Refactored Cronjob controller that uses DelayingQueue and informers
type ControllerV2 struct {
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
jobControl jobControlInterface
cronJobControl cjControlInterface
jobLister batchv1listers.JobLister
cronJobLister batchv1beta1listers.CronJobLister
jobListerSynced cache.InformerSynced
cronJobListerSynced cache.InformerSynced
// now is a function that returns current time, done to facilitate unit tests
now func() time.Time
}
// NewControllerV2 creates and initializes a new Controller.
func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1beta1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
jm := &ControllerV2{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
jobControl: realJobControl{KubeClient: kubeClient},
cronJobControl: &realCJControl{KubeClient: kubeClient},
jobLister: jobInformer.Lister(),
cronJobLister: cronJobsInformer.Lister(),
jobListerSynced: jobInformer.Informer().HasSynced,
cronJobListerSynced: cronJobsInformer.Informer().HasSynced,
now: time.Now,
}
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addJob,
UpdateFunc: jm.updateJob,
DeleteFunc: jm.deleteJob,
})
cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
jm.enqueueController(obj)
},
UpdateFunc: jm.updateCronJob,
DeleteFunc: func(obj interface{}) {
jm.enqueueController(obj)
},
})
return jm, nil
}
// Run starts the main goroutine responsible for watching and syncing jobs.
func (jm *ControllerV2) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer jm.queue.ShutDown()
klog.Infof("Starting cronjob controller v2")
defer klog.Infof("Shutting down cronjob controller v2")
if !cache.WaitForNamedCacheSync("cronjob", stopCh, jm.jobListerSynced, jm.cronJobListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh)
}
<-stopCh
}
func (jm *ControllerV2) worker() {
for jm.processNextWorkItem() {
}
}
func (jm *ControllerV2) processNextWorkItem() bool {
key, quit := jm.queue.Get()
if quit {
return false
}
defer jm.queue.Done(key)
requeueAfter, err := jm.sync(key.(string))
switch {
case err != nil:
utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))
jm.queue.AddRateLimited(key)
case requeueAfter != nil:
jm.queue.Forget(key)
jm.queue.AddAfter(key, *requeueAfter)
}
return true
}
func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) {
ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)
if err != nil {
return nil, err
}
cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
switch {
case errors.IsNotFound(err):
// may be cronjob is deleted, dont need to requeue this key
klog.V(4).InfoS("cronjob not found, may be it is deleted", "cronjob", klog.KRef(ns, name), "err", err)
return nil, nil
case err != nil:
// for other transient apiserver error requeue with exponential backoff
return nil, err
}
jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)
if err != nil {
return nil, err
}
cronJob, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled)
if err != nil {
klog.V(2).InfoS("error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)
return nil, err
}
err = jm.cleanupFinishedJobs(cronJob, jobsToBeReconciled)
if err != nil {
klog.V(2).InfoS("error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err)
return nil, err
}
if requeueAfter != nil {
klog.V(4).InfoS("re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter)
return requeueAfter, nil
}
// this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format
return nil, nil
}
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind.
func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batchv1beta1.CronJob {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
cronJob, err := jm.cronJobLister.CronJobs(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if cronJob.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return cronJob
}
func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]batchv1.Job, error) {
var jobSelector labels.Selector
if len(cronJob.Spec.JobTemplate.Labels) == 0 {
jobSelector = labels.Everything()
} else {
jobSelector = labels.Set(cronJob.Spec.JobTemplate.Labels).AsSelector()
}
jobList, err := jm.jobLister.Jobs(cronJob.Namespace).List(jobSelector)
if err != nil {
return nil, err
}
jobsToBeReconciled := []batchv1.Job{}
for _, job := range jobList {
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name {
// this job is needs to be reconciled
jobsToBeReconciled = append(jobsToBeReconciled, *job)
}
}
return jobsToBeReconciled, nil
}
// When a job is created, enqueue the controller that manages it and update it's expectations.
func (jm *ControllerV2) addJob(obj interface{}) {
job := obj.(*batchv1.Job)
if job.DeletionTimestamp != nil {
// on a restart of the controller, it's possible a new job shows up in a state that
// is already pending deletion. Prevent the job from being a creation observation.
jm.deleteJob(job)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(job); controllerRef != nil {
cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)
if cronJob == nil {
return
}
jm.enqueueController(cronJob)
return
}
}
// updateJob figures out what CronJob(s) manage a Job when the Job
// is updated and wake them up. If the anything of the Job have changed, we need to
// awaken both the old and new CronJob. old and cur must be *batchv1.Job
// types.
func (jm *ControllerV2) updateJob(old, cur interface{}) {
curJob := cur.(*batchv1.Job)
oldJob := old.(*batchv1.Job)
if curJob.ResourceVersion == oldJob.ResourceVersion {
// Periodic resync will send update events for all known jobs.
// Two different versions of the same jobs will always have different RVs.
return
}
curControllerRef := metav1.GetControllerOf(curJob)
oldControllerRef := metav1.GetControllerOf(oldJob)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if cronJob := jm.resolveControllerRef(oldJob.Namespace, oldControllerRef); cronJob != nil {
jm.enqueueController(cronJob)
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
cronJob := jm.resolveControllerRef(curJob.Namespace, curControllerRef)
if cronJob == nil {
return
}
jm.enqueueController(cronJob)
return
}
}
func (jm *ControllerV2) deleteJob(obj interface{}) {
job, ok := obj.(*batchv1.Job)
// When a delete is dropped, the relist will notice a job in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
job, ok = tombstone.Obj.(*batchv1.Job)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
controllerRef := metav1.GetControllerOf(job)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)
if cronJob == nil {
return
}
jm.enqueueController(cronJob)
}
func (jm *ControllerV2) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
jm.queue.Add(key)
}
func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
jm.queue.AddAfter(key, t)
}
// updateCronJob re-queues the CronJob for next scheduled time if there is a
// change in spec.schedule otherwise it re-queues it now
func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
oldCJ, okOld := old.(*batchv1beta1.CronJob)
newCJ, okNew := curr.(*batchv1beta1.CronJob)
if !okOld || !okNew {
// typecasting of one failed, handle this better, may be log entry
return
}
// if the change in schedule results in next requeue having to be sooner than it already was,
// it will be handled here by the queue. If the next requeue is further than previous schedule,
// the sync loop will essentially be a no-op for the already queued key with old schedule.
if oldCJ.Spec.Schedule != newCJ.Spec.Schedule {
// schedule changed, change the requeue time
sched, err := cron.ParseStandard(newCJ.Spec.Schedule)
if err != nil {
// this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec
klog.V(2).InfoS("unparseable schedule for cronjob", "cronjob", klog.KRef(newCJ.GetNamespace(), newCJ.GetName()), "schedule", newCJ.Spec.Schedule, "err", err)
jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule", "unparseable schedule for cronjob: %s", newCJ.Spec.Schedule)
return
}
now := jm.now()
t := nextScheduledTimeDuration(sched, now)
jm.enqueueControllerAfter(curr, *t)
return
}
// other parameters changed, requeue this now and if this gets triggered
// within deadline, sync loop will work on the CJ otherwise updates will be handled
// during the next schedule
// TODO: need to handle the change of spec.JobTemplate.metadata.labels explicitly
// to cleanup jobs with old labels
jm.enqueueController(curr)
}
// syncCronJob reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "cj" should be included in "js".
// The current time is passed in to facilitate testing.
// It returns a copy of the CronJob that is to be used by other functions
// that mutates the object
func (jm *ControllerV2) syncCronJob(
cj *batchv1beta1.CronJob,
js []batchv1.Job) (*batchv1beta1.CronJob, *time.Duration, error) {
cj = cj.DeepCopy()
now := jm.now()
childrenJobs := make(map[types.UID]bool)
for _, j := range js {
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(*cj, j.ObjectMeta.UID)
if !found && !IsJobFinished(&j) {
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
// We found an unfinished job that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.
} else if found && IsJobFinished(&j) {
_, status := getFinishedStatus(&j)
deleteFromActiveList(cj, j.ObjectMeta.UID)
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
}
}
// Remove any job reference from the active list if the corresponding job does not exist any more.
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
// job running.
for _, j := range cj.Status.Active {
_, found := childrenJobs[j.UID]
if found {
continue
}
// Explicitly try to get the job from api-server to avoid a slow watch not able to update
// the job lister on time, giving an unwanted miss
_, err := jm.jobControl.GetJob(j.Namespace, j.Name)
switch {
case errors.IsNotFound(err):
// The job is actually missing, delete from active list and schedule a new one if within
// deadline
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(cj, j.UID)
case err != nil:
return cj, nil, err
}
// the job is missing in the lister but found in api-server
}
updatedCJ, err := jm.cronJobControl.UpdateStatus(cj)
if err != nil {
klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
return cj, nil, err
}
*cj = *updatedCJ
if cj.DeletionTimestamp != nil {
// The CronJob is being deleted.
// Don't do anything other than updating status.
return cj, nil, nil
}
if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
return cj, nil, nil
}
sched, err := cron.ParseStandard(cj.Spec.Schedule)
if err != nil {
// this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec
klog.V(2).InfoS("unparseable schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err)
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %s : %s", cj.Spec.Schedule, err)
return cj, nil, nil
}
times := getUnmetScheduleTimes(*cj, now, sched, jm.recorder)
if len(times) == 0 {
// no unmet start time, return cj,.
// The only time this should happen is if queue is filled after restart.
// Otherwise, the queue is always suppose to trigger sync function at the time of
// the scheduled time, that will give atleast 1 unmet time schedule
klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
t := nextScheduledTimeDuration(sched, now)
return cj, t, nil
}
scheduledTime := times[len(times)-1]
tooLate := false
if cj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
t := nextScheduledTimeDuration(sched, now)
return cj, t, nil
}
if isJobInActiveList(&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: getJobName(cj, scheduledTime),
Namespace: cj.Namespace,
}}, cj.Status.Active) || cj.Status.LastScheduleTime.Equal(&metav1.Time{Time: scheduledTime}) {
klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", scheduledTime)
t := nextScheduledTimeDuration(sched, now)
return cj, t, nil
}
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
// So it is theoretically possible to have concurrency with Forbid.
// As long the as the invocations are "far enough apart in time", this usually won't happen.
//
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
t := nextScheduledTimeDuration(sched, now)
return cj, t, nil
}
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
for _, j := range cj.Status.Active {
klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
if err != nil {
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return cj, nil, err
}
if !deleteJob(cj, job, jm.jobControl, jm.recorder) {
return cj, nil, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
}
}
}
jobReq, err := getJobFromTemplate2(cj, scheduledTime)
if err != nil {
klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
return cj, nil, err
}
jobResp, err := jm.jobControl.CreateJob(cj.Namespace, jobReq)
switch {
case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
case errors.IsAlreadyExists(err):
klog.InfoS("Job already exists", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
case err != nil:
// default error handling
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return cj, nil, err
}
klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ //
// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we re-list the SJs and Jobs on the next
// iteration of syncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice (name the job with hash of its
// scheduled time).
// Add the just-started job to the status list.
jobRef, err := getRef(jobResp)
if err != nil {
klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "err", err)
return cj, nil, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cj.GetNamespace(), cj.GetName()))
}
cj.Status.Active = append(cj.Status.Active, *jobRef)
cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
if _, err := jm.cronJobControl.UpdateStatus(cj); err != nil {
klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err)
}
t := nextScheduledTimeDuration(sched, now)
return cj, t, nil
}
func getJobName(cj *batchv1beta1.CronJob, scheduledTime time.Time) string {
return fmt.Sprintf("%s-%d", cj.Name, getTimeHashInMinutes(scheduledTime))
}
// nextScheduledTimeDuration returns the time duration to requeue based on
// the schedule and current time. It adds a 100ms padding to the next requeue to account
// for Network Time Protocol(NTP) time skews. If the time drifts are adjusted which in most
// realistic cases would be around 100s, scheduled cron will still be executed without missing
// the schedule.
func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duration {
t := sched.Next(now).Add(nextScheduleDelta).Sub(now)
return &t
}
// cleanupFinishedJobs cleanups finished jobs created by a CronJob
func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batchv1.Job) error {
// If neither limits are active, there is no need to do anything.
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
return nil
}
failedJobs := []batchv1.Job{}
successfulJobs := []batchv1.Job{}
for _, job := range js {
isFinished, finishedStatus := getFinishedStatus(&job)
if isFinished && finishedStatus == batchv1.JobComplete {
successfulJobs = append(successfulJobs, job)
} else if isFinished && finishedStatus == batchv1.JobFailed {
failedJobs = append(failedJobs, job)
}
}
if cj.Spec.SuccessfulJobsHistoryLimit != nil {
removeOldestJobs(cj,
successfulJobs,
jm.jobControl,
*cj.Spec.SuccessfulJobsHistoryLimit,
jm.recorder)
}
if cj.Spec.FailedJobsHistoryLimit != nil {
removeOldestJobs(cj,
failedJobs,
jm.jobControl,
*cj.Spec.FailedJobsHistoryLimit,
jm.recorder)
}
// Update the CronJob, in case jobs were removed from the list.
_, err := jm.cronJobControl.UpdateStatus(cj)
return err
}
// isJobInActiveList take a job and checks if activeJobs has a job with the same
// name and namespace.
func isJobInActiveList(job *batchv1.Job, activeJobs []corev1.ObjectReference) bool {
for _, j := range activeJobs {
if j.Name == job.Name && j.Namespace == job.Namespace {
return true
}
}
return false
}

View File

@ -0,0 +1,545 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cronjob
import (
"fmt"
"github.com/robfig/cron"
"k8s.io/apimachinery/pkg/labels"
"reflect"
"strings"
"sync"
"testing"
"time"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchv1listers "k8s.io/client-go/listers/batch/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
)
func justASecondBeforeTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:59Z")
if err != nil {
panic("test setup error")
}
return T1
}
func Test_syncOne2(t *testing.T) {
// Check expectations on deadline parameters
if shortDead/60/60 >= 1 {
t.Errorf("shortDead should be less than one hour")
}
if mediumDead/60/60 < 1 || mediumDead/60/60 >= 24 {
t.Errorf("mediumDead should be between one hour and one day")
}
if longDead/60/60/24 < 10 {
t.Errorf("longDead should be at least ten days")
}
testCases := map[string]struct {
// cj spec
concurrencyPolicy batchv1beta1.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
// cj status
ranPreviously bool
stillActive bool
jobCreationTime time.Time
// environment
now time.Time
// expectations
expectCreate bool
expectDelete bool
expectActive int
expectedWarnings int
expectErr bool
expectRequeueAfter bool
jobStillNotFoundInLister bool
}{
"never ran, not valid schedule, A": {A, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F, F},
"never ran, not valid schedule, F": {f, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F, F},
"never ran, not valid schedule, R": {f, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F, F},
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, T, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F},
"prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F},
"prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, T, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T, F},
"still active, not time, A": {A, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T, F},
"still active, not time, F": {f, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T, F},
"still active, not time, R": {R, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T, F},
"still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, F, 2, 0, F, T, F},
"still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, T, F},
"still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, T, 1, 0, F, T, F},
"still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, F, F},
"still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, T, F},
"still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, F, 2, 0, F, T, F},
// Controller should fail to schedule these, as there are too many missed starting times
// and either no deadline or a too long deadline.
"prev ran but done, long overdue, not past deadline, A": {A, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F},
"prev ran but done, long overdue, not past deadline, R": {R, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F},
"prev ran but done, long overdue, not past deadline, F": {f, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F},
"prev ran but done, long overdue, no deadline, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F},
"prev ran but done, long overdue, no deadline, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F},
"prev ran but done, long overdue, no deadline, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 1, F, T, F},
"prev ran but done, long overdue, past medium deadline, A": {A, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, long overdue, past short deadline, A": {A, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, long overdue, past medium deadline, R": {R, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, long overdue, past short deadline, R": {R, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, long overdue, past medium deadline, F": {f, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F},
"prev ran but done, long overdue, past short deadline, F": {f, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T, F},
// Tests for time skews
"this ran but done, time drifted back, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), justBeforeTheHour(), F, F, 0, 0, F, T, F},
// Tests for slow job lister
"this started but went missing, not past deadline, A": {A, F, onTheHour, longDead, T, T, topOfTheHour().Add(time.Millisecond * 100), justAfterTheHour().Add(time.Millisecond * 100), F, F, 1, 0, F, T, T},
"this started but went missing, not past deadline, f": {f, F, onTheHour, longDead, T, T, topOfTheHour().Add(time.Millisecond * 100), justAfterTheHour().Add(time.Millisecond * 100), F, F, 1, 0, F, T, T},
"this started but went missing, not past deadline, R": {R, F, onTheHour, longDead, T, T, topOfTheHour().Add(time.Millisecond * 100), justAfterTheHour().Add(time.Millisecond * 100), F, F, 1, 0, F, T, T},
// TODO: alpatel add tests for slow cronjob lister
}
for name, tc := range testCases {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
if name == "this ran but done, time drifted back, F" {
println("hello")
}
cj := cronJob()
cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
cj.Spec.Suspend = &tc.suspend
cj.Spec.Schedule = tc.schedule
if tc.deadline != noDead {
cj.Spec.StartingDeadlineSeconds = &tc.deadline
}
var (
job *batchv1.Job
err error
)
js := []batchv1.Job{}
if tc.ranPreviously {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
job, err = getJobFromTemplate2(&cj, tc.jobCreationTime)
if err != nil {
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
}
job.UID = "1234"
job.Namespace = cj.Namespace
if tc.stillActive {
ref, err := getRef(job)
if err != nil {
t.Fatalf("%s: unexpected error getting the job object reference: %v", name, err)
}
cj.Status.Active = []v1.ObjectReference{*ref}
if !tc.jobStillNotFoundInLister {
js = append(js, *job)
}
}
} else {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
if tc.stillActive {
t.Errorf("%s: test setup error: this case makes no sense", name)
}
}
jc := &fakeJobControl{Job: job}
cjc := &fakeCJControl{}
recorder := record.NewFakeRecorder(10)
jm := ControllerV2{
jobControl: jc,
cronJobControl: cjc,
recorder: recorder,
now: func() time.Time {
return tc.now
},
}
cjCopy, requeueAfter, err := jm.syncCronJob(&cj, js)
if tc.expectErr && err == nil {
t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter)
}
if tc.expectRequeueAfter {
sched, err := cron.ParseStandard(tc.schedule)
if err != nil {
t.Errorf("%s: test setup error: the schedule %s is unparseable: %#v", name, tc.schedule, err)
}
expectedRequeueAfter := nextScheduledTimeDuration(sched, tc.now)
if !reflect.DeepEqual(requeueAfter, expectedRequeueAfter) {
t.Errorf("%s: expected requeueAfter: %+v, got requeueAfter time: %+v", name, expectedRequeueAfter, requeueAfter)
}
}
expectedCreates := 0
if tc.expectCreate {
expectedCreates = 1
}
if len(jc.Jobs) != expectedCreates {
t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs))
}
for i := range jc.Jobs {
job := &jc.Jobs[i]
controllerRef := metav1.GetControllerOf(job)
if controllerRef == nil {
t.Errorf("%s: expected job to have ControllerRef: %#v", name, job)
} else {
if got, want := controllerRef.APIVersion, "batch/v1beta1"; got != want {
t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want)
}
if got, want := controllerRef.Kind, "CronJob"; got != want {
t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want)
}
if got, want := controllerRef.Name, cj.Name; got != want {
t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want)
}
if got, want := controllerRef.UID, cj.UID; got != want {
t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("%s: controllerRef.Controller is not set to true", name)
}
}
}
expectedDeletes := 0
if tc.expectDelete {
expectedDeletes = 1
}
if len(jc.DeleteJobName) != expectedDeletes {
t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName))
}
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1
expectedEvents := 0
if tc.expectCreate {
expectedEvents++
expectUpdates++
}
if tc.expectDelete {
expectedEvents++
}
if name == "still active, is time, F" {
// this is the only test case where we would raise an event for not scheduling
expectedEvents++
}
expectedEvents += tc.expectedWarnings
if len(recorder.Events) != expectedEvents {
t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
}
numWarnings := 0
for i := 1; i <= len(recorder.Events); i++ {
e := <-recorder.Events
if strings.HasPrefix(e, v1.EventTypeWarning) {
numWarnings++
}
}
if numWarnings != tc.expectedWarnings {
t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings)
}
if len(cjc.Updates) == expectUpdates && tc.expectActive != len(cjc.Updates[expectUpdates-1].Status.Active) {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(cjc.Updates[expectUpdates-1].Status.Active))
}
if &cj == cjCopy {
t.Errorf("syncCronJob is not creating a copy of the original cronjob")
}
})
}
}
// this test will take around 61 seconds to complete
func TestController2_updateCronJob(t *testing.T) {
cjc := &fakeCJControl{}
jc := &fakeJobControl{}
type fields struct {
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
jobControl jobControlInterface
cronJobControl cjControlInterface
}
type args struct {
oldJobTemplate *batchv1beta1.JobTemplateSpec
newJobTemplate *batchv1beta1.JobTemplateSpec
oldJobSchedule string
newJobSchedule string
}
tests := []struct {
name string
fields fields
args args
deltaTimeForQueue time.Duration
roundOffTimeDuration time.Duration
}{
{
name: "spec.template changed",
fields: fields{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-update-cronjob"),
recorder: record.NewFakeRecorder(10),
jobControl: jc,
cronJobControl: cjc,
},
args: args{
oldJobTemplate: &batchv1beta1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: jobSpec(),
},
newJobTemplate: &batchv1beta1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "foo"},
Annotations: map[string]string{"x": "y"},
},
Spec: jobSpec(),
},
},
deltaTimeForQueue: 0 * time.Second,
roundOffTimeDuration: 500*time.Millisecond + nextScheduleDelta,
},
{
name: "spec.schedule changed",
fields: fields{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-update-cronjob"),
recorder: record.NewFakeRecorder(10),
jobControl: jc,
cronJobControl: cjc,
},
args: args{
oldJobSchedule: "30 * * * *",
newJobSchedule: "1 * * * *",
},
deltaTimeForQueue: 61*time.Second + nextScheduleDelta,
roundOffTimeDuration: 750 * time.Millisecond,
},
// TODO: Add more test cases for updating scheduling.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cj := cronJob()
newCj := cronJob()
if tt.args.oldJobTemplate != nil {
cj.Spec.JobTemplate = *tt.args.oldJobTemplate
}
if tt.args.newJobTemplate != nil {
newCj.Spec.JobTemplate = *tt.args.newJobTemplate
}
if tt.args.oldJobSchedule != "" {
cj.Spec.Schedule = tt.args.oldJobSchedule
}
if tt.args.newJobSchedule != "" {
newCj.Spec.Schedule = tt.args.newJobSchedule
}
jm := &ControllerV2{
queue: tt.fields.queue,
recorder: tt.fields.recorder,
jobControl: tt.fields.jobControl,
cronJobControl: tt.fields.cronJobControl,
}
jm.now = justASecondBeforeTheHour
now := time.Now()
then := time.Now()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
now = time.Now()
jm.queue.Get()
then = time.Now()
wg.Done()
return
}()
jm.updateCronJob(&cj, &newCj)
wg.Wait()
d := then.Sub(now)
if d.Round(tt.roundOffTimeDuration).Seconds() != tt.deltaTimeForQueue.Round(tt.roundOffTimeDuration).Seconds() {
t.Errorf("Expected %#v got %#v", tt.deltaTimeForQueue.Round(tt.roundOffTimeDuration).String(), d.Round(tt.roundOffTimeDuration).String())
}
})
}
}
type FakeNamespacedJobLister struct {
jobs []*batchv1.Job
namespace string
}
func (f *FakeNamespacedJobLister) Get(name string) (*batchv1.Job, error) {
for _, j := range f.jobs {
if j.Namespace == f.namespace && j.Namespace == name {
return j, nil
}
}
return nil, fmt.Errorf("Not Found")
}
func (f *FakeNamespacedJobLister) List(selector labels.Selector) ([]*batchv1.Job, error) {
ret := []*batchv1.Job{}
for _, j := range f.jobs {
if f.namespace != "" && f.namespace != j.Namespace {
continue
}
if selector.Matches(labels.Set(j.GetLabels())) {
ret = append(ret, j)
}
}
return ret, nil
}
func (f *FakeNamespacedJobLister) Jobs(namespace string) batchv1listers.JobNamespaceLister {
f.namespace = namespace
return f
}
func (f *FakeNamespacedJobLister) GetPodJobs(pod *v1.Pod) (jobs []batchv1.Job, err error) {
panic("implement me")
}
func TestControllerV2_getJobList(t *testing.T) {
trueRef := true
type fields struct {
jobLister batchv1listers.JobLister
}
type args struct {
cronJob *batchv1beta1.CronJob
}
tests := []struct {
name string
fields fields
args args
want []batchv1.Job
wantErr bool
}{
{
name: "test getting jobs in namespace without controller reference",
fields: fields{
&FakeNamespacedJobLister{jobs: []*batchv1.Job{
{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"},
},
}}},
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
want: []batchv1.Job{},
},
{
name: "test getting jobs in namespace with a controller reference",
fields: fields{
&FakeNamespacedJobLister{jobs: []*batchv1.Job{
{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns",
OwnerReferences: []metav1.OwnerReference{
{
Name: "fooer",
Controller: &trueRef,
},
}},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "foo-ns"},
},
}}},
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
want: []batchv1.Job{{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns",
OwnerReferences: []metav1.OwnerReference{
{
Name: "fooer",
Controller: &trueRef,
},
}},
}},
},
{
name: "test getting jobs in other namespaces ",
fields: fields{
&FakeNamespacedJobLister{jobs: []*batchv1.Job{
{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar-ns"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "bar-ns"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "bar-ns"},
},
}}},
args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}},
want: []batchv1.Job{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
jm := &ControllerV2{
jobLister: tt.fields.jobLister,
}
got, err := jm.getJobsToBeReconciled(tt.args.cronJob)
if (err != nil) != tt.wantErr {
t.Errorf("getJobsToBeReconciled() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getJobsToBeReconciled() got = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -35,6 +35,8 @@ import (
// created as an interface to allow testing.
type cjControlInterface interface {
UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error)
// GetCronJob retrieves a CronJob.
GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error)
}
// realCJControl is the default implementation of cjControlInterface.
@ -42,6 +44,10 @@ type realCJControl struct {
KubeClient clientset.Interface
}
func (c *realCJControl) GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) {
return c.KubeClient.BatchV1beta1().CronJobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
var _ cjControlInterface = &realCJControl{}
func (c *realCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) {
@ -53,6 +59,10 @@ type fakeCJControl struct {
Updates []batchv1beta1.CronJob
}
func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1beta1.CronJob, error) {
panic("implement me")
}
var _ cjControlInterface = &fakeCJControl{}
func (c *fakeCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) {

View File

@ -25,9 +25,10 @@ import (
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
)
// Utilities for dealing with Jobs and CronJobs and time.
@ -45,7 +46,9 @@ func deleteFromActiveList(cj *batchv1beta1.CronJob, uid types.UID) {
if cj == nil {
return
}
newActive := []v1.ObjectReference{}
// TODO: @alpatel the memory footprint can may be reduced here by
// cj.Status.Active = append(cj.Status.Active[:indexToRemove], cj.Status.Active[indexToRemove:]...)
newActive := []corev1.ObjectReference{}
for _, j := range cj.Status.Active {
if j.UID != uid {
newActive = append(newActive, j)
@ -147,6 +150,71 @@ func getRecentUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time) ([]time
return starts, nil
}
// getUnmetScheduleTimes gets the slice of all the missed times from the time a job
// last was scheduled to up `now`.
//
// If there are too many (>100) unstarted times, it will raise a warning and but still return
// the list of missed times.
func getUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) []time.Time {
starts := []time.Time{}
var earliestTime time.Time
if cj.Status.LastScheduleTime != nil {
earliestTime = cj.Status.LastScheduleTime.Time
} else {
// If none found, then this is either a recently created cronJob,
// or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the
// CronJob as last known start time.
earliestTime = cj.ObjectMeta.CreationTimestamp.Time
}
if cj.Spec.StartingDeadlineSeconds != nil {
// Controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return []time.Time{}
}
// t := schedule.Next(earliestTime)
// t1 := schedule.Next(t)
// delta := t1 - t
// missed := now - earliestTime/delta
// last missed = earliestTime + delta * (missed - 1)
// TODO: @alpatel, convert the following for loop into above logic and add test cases
for t := schedule.Next(earliestTime); !t.After(now); t = schedule.Next(t) {
starts = append(starts, t)
}
if len(starts) > 100 {
// An object might miss several starts. For example, if
// controller gets wedged on friday at 5:01pm when everyone has
// gone home, and someone comes in on tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly cronJob, should
// all start running with no further intervention (if the cronJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the missed start times.
//
// I've somewhat arbitrarily picked 100, as more than 80,
// but less than "lots".
recorder.Eventf(&cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", len(starts))
klog.InfoS("too many missed times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "missed times", len(starts))
}
return starts
}
// getJobFromTemplate makes a Job from a CronJob
func getJobFromTemplate(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
labels := copyLabels(&cj.Spec.JobTemplate)
@ -171,9 +239,35 @@ func getTimeHash(scheduledTime time.Time) int64 {
return scheduledTime.Unix()
}
// getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from
// epoch time and concatenates that to the job name, because the cronjob_controller v2 has the lowest
// granularity of 1 minute for scheduling job.
func getJobFromTemplate2(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
labels := copyLabels(&cj.Spec.JobTemplate)
annotations := copyAnnotations(&cj.Spec.JobTemplate)
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := getJobName(cj, scheduledTime)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: annotations,
Name: name,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)},
},
}
cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec)
return job, nil
}
// getTimeHash returns Unix Epoch Time in minutes
func getTimeHashInMinutes(scheduledTime time.Time) int64 {
return scheduledTime.Unix() / 60
}
func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
return true, c.Type
}
}

View File

@ -23,11 +23,13 @@ import (
"testing"
"time"
cron "github.com/robfig/cron"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
utilpointer "k8s.io/utils/pointer"
)
@ -89,6 +91,64 @@ func TestGetJobFromTemplate(t *testing.T) {
}
}
func TestGetJobFromTemplate2(t *testing.T) {
// getJobFromTemplate2() needs to take the job template and copy the labels and annotations
// and other fields, and add a created-by reference.
var one int64 = 1
var no bool
cj := batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob",
},
Spec: batchv1beta1.CronJobSpec{
Schedule: "* * * * ?",
ConcurrencyPolicy: batchv1beta1.AllowConcurrent,
JobTemplate: batchv1beta1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &one,
ManualSelector: &no,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Image: "foo/bar"},
},
},
},
},
},
},
}
var job *batchv1.Job
job, err := getJobFromTemplate2(&cj, time.Time{})
if err != nil {
t.Errorf("Did not expect error: %s", err)
}
if !strings.HasPrefix(job.ObjectMeta.Name, "mycronjob-") {
t.Errorf("Wrong Name")
}
if len(job.ObjectMeta.Labels) != 1 {
t.Errorf("Wrong number of labels")
}
if len(job.ObjectMeta.Annotations) != 1 {
t.Errorf("Wrong number of annotations")
}
}
func TestGetParentUIDFromJob(t *testing.T) {
j := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
@ -242,6 +302,137 @@ func TestGroupJobsByParent(t *testing.T) {
}
}
func TestGetLatestUnmetScheduleTimes(t *testing.T) {
// schedule is hourly on the hour
schedule := "0 * * * ?"
PraseSchedule := func(schedule string) cron.Schedule {
sched, err := cron.ParseStandard(schedule)
if err != nil {
t.Errorf("Error parsing schedule: %#v", err)
return nil
}
return sched
}
recorder := record.NewFakeRecorder(50)
// T1 is a scheduled start time of that schedule
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
t.Errorf("test setup error: %v", err)
}
// T2 is a scheduled start time of that schedule after T1
T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z")
if err != nil {
t.Errorf("test setup error: %v", err)
}
cj := batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batchv1beta1.CronJobSpec{
Schedule: schedule,
ConcurrencyPolicy: batchv1beta1.AllowConcurrent,
JobTemplate: batchv1beta1.JobTemplateSpec{},
},
}
{
// Case 1: no known start times, and none needed yet.
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is more than creation time, but less than T1.
now := T1.Add(-7 * time.Minute)
times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if len(times) != 0 {
t.Errorf("expected no start times, got: %v", times)
}
}
{
// Case 2: no known start times, and one needed.
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is after T1
now := T1.Add(2 * time.Second)
times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if len(times) != 1 {
t.Errorf("expected 1 start time, got: %v", times)
} else if !times[0].Equal(T1) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
}
{
// Case 3: known LastScheduleTime, no start needed.
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time.
cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1
now := T1.Add(2 * time.Minute)
times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if len(times) != 0 {
t.Errorf("expected 0 start times, got: %v", times)
}
}
{
// Case 4: known LastScheduleTime, a start needed
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time.
cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1 and after T2
now := T2.Add(5 * time.Minute)
times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if len(times) != 1 {
t.Errorf("expected 1 start times, got: %v", times)
} else if !times[0].Equal(T2) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
}
{
// Case 5: known LastScheduleTime, two starts needed
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
// Current time is after T1 and after T2
now := T2.Add(5 * time.Minute)
times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if len(times) != 2 {
t.Errorf("expected 2 start times, got: %v", times)
} else {
if !times[0].Equal(T1) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
if !times[1].Equal(T2) {
t.Errorf("expected: %v, got: %v", T2, times[1])
}
}
}
{
// Case 6: now is way way ahead of last start time, and there is no deadline.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour)
times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if len(times) == 0 {
t.Errorf("expected more than 0 missed times")
}
}
{
// Case 7: now is way way ahead of last start time, but there is a short deadline.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour)
// Deadline is short
deadline := int64(2 * 60 * 60)
cj.Spec.StartingDeadlineSeconds = &deadline
times := getUnmetScheduleTimes(cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if len(times) == 0 {
t.Errorf("expected more than 0 missed times")
}
}
}
func TestGetRecentUnmetScheduleTimes(t *testing.T) {
// schedule is hourly on the hour
schedule := "0 * * * ?"

View File

@ -560,6 +560,16 @@ const (
// Enable all logic related to the PodDisruptionBudget API object in policy
PodDisruptionBudget featuregate.Feature = "PodDisruptionBudget"
// owner: @alaypatel07, @soltysh
// alpha: v1.20
// beta: v1.21
//
// CronJobControllerV2 controls whether the controller manager starts old cronjob
// controller or new one which is implemented with informers and delaying queue
//
// This feature is deprecated, and will be removed in v1.22.
CronJobControllerV2 featuregate.Feature = "CronJobControllerV2"
// owner: @m1093782566
// alpha: v1.17
//
@ -754,6 +764,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23
AllowInsecureBackendProxy: {Default: true, PreRelease: featuregate.Beta},
PodDisruptionBudget: {Default: true, PreRelease: featuregate.Beta},
CronJobControllerV2: {Default: false, PreRelease: featuregate.Alpha},
ServiceTopology: {Default: false, PreRelease: featuregate.Alpha},
ServiceAppProtocol: {Default: true, PreRelease: featuregate.Beta},
ImmutableEphemeralVolumes: {Default: true, PreRelease: featuregate.Beta},

View File

@ -170,6 +170,32 @@ var _ = SIGDescribe("CronJob", func() {
framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
})
ginkgo.It("should be able to schedule after more than 100 missed schedule", func() {
ginkgo.By("Creating a cronjob")
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1beta1.ForbidConcurrent,
sleepCommand, nil, nil)
creationTime := time.Now().Add(-99 * 24 * time.Hour)
lastScheduleTime := creationTime.Add(-1 * 24 * time.Hour)
cronJob.CreationTimestamp = metav1.Time{Time: creationTime}
cronJob.Status.LastScheduleTime = &metav1.Time{Time: lastScheduleTime}
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name)
ginkgo.By("Ensuring one job is running")
err = waitForActiveJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
framework.ExpectNoError(err, "Failed to wait for active jobs in CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
ginkgo.By("Ensuring at least one running jobs exists by listing jobs explicitly")
jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name)
activeJobs, _ := filterActiveJobs(jobs)
gomega.Expect(len(activeJobs)).To(gomega.BeNumerically(">=", 1))
ginkgo.By("Removing cronjob")
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name)
})
// shouldn't give us unexpected warnings
ginkgo.It("should not emit unexpected warnings", func() {
ginkgo.By("Creating a cronjob")