diff --git a/pkg/controller/cronjob/cronjob_controller.go b/pkg/controller/cronjob/cronjob_controller.go deleted file mode 100644 index c1ea073f17b..00000000000 --- a/pkg/controller/cronjob/cronjob_controller.go +++ /dev/null @@ -1,387 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cronjob - -/* -I did not use watch or expectations. Those add a lot of corner cases, and we aren't -expecting a large volume of jobs or cronJobs. (We are favoring correctness -over scalability. If we find a single controller thread is too slow because -there are a lot of Jobs or CronJobs, we can parallelize by Namespace. -If we find the load on the API server is too high, we can use a watch and -UndeltaStore.) - -Just periodically list jobs and cronJobs, and then reconcile them. -*/ - -import ( - "context" - "fmt" - "sort" - "time" - - "k8s.io/klog/v2" - - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/pager" - "k8s.io/client-go/tools/record" - ref "k8s.io/client-go/tools/reference" - "k8s.io/component-base/metrics/prometheus/ratelimiter" -) - -// Utilities for dealing with Jobs and CronJobs and time. - -// controllerKind contains the schema.GroupVersionKind for this controller type. -var controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob") - -// Controller is a controller for CronJobs. -type Controller struct { - kubeClient clientset.Interface - jobControl jobControlInterface - cjControl cjControlInterface - podControl podControlInterface - recorder record.EventRecorder -} - -// NewController creates and initializes a new Controller. -func NewController(kubeClient clientset.Interface) (*Controller, error) { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - - if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { - if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { - return nil, err - } - } - - jm := &Controller{ - kubeClient: kubeClient, - jobControl: realJobControl{KubeClient: kubeClient}, - cjControl: &realCJControl{KubeClient: kubeClient}, - podControl: &realPodControl{KubeClient: kubeClient}, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}), - } - - return jm, nil -} - -// Run starts the main goroutine responsible for watching and syncing jobs. -func (jm *Controller) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - klog.Infof("Starting CronJob Manager") - // Check things every 10 second. - go wait.Until(jm.syncAll, 10*time.Second, stopCh) - <-stopCh - klog.Infof("Shutting down CronJob Manager") -} - -// syncAll lists all the CronJobs and Jobs and reconciles them. -func (jm *Controller) syncAll() { - // List children (Jobs) before parents (CronJob). - // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer, - // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639). - // Note that this only works because we are NOT using any caches here. - jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) { - return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts) - } - - js := make([]batchv1.Job, 0) - err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { - jobTmp, ok := object.(*batchv1.Job) - if !ok { - return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp) - } - js = append(js, *jobTmp) - return nil - }) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err)) - return - } - klog.V(4).Infof("Found %d jobs", len(js)) - - jobsByCj := groupJobsByParent(js) - klog.V(4).Infof("Found %d groups", len(jobsByCj)) - - err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return jm.kubeClient.BatchV1().CronJobs(metav1.NamespaceAll).List(ctx, opts) - }).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { - cj, ok := object.(*batchv1.CronJob) - if !ok { - return fmt.Errorf("expected type *batchv1.CronJob, got type %T", cj) - } - syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder) - cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder) - return nil - }) - - if err != nil { - utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err)) - return - } -} - -// cleanupFinishedJobs cleanups finished jobs created by a CronJob -func cleanupFinishedJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, - cjc cjControlInterface, recorder record.EventRecorder) { - // If neither limits are active, there is no need to do anything. - if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { - return - } - - failedJobs := []batchv1.Job{} - successfulJobs := []batchv1.Job{} - - for _, job := range js { - isFinished, finishedStatus := getFinishedStatus(&job) - if isFinished && finishedStatus == batchv1.JobComplete { - successfulJobs = append(successfulJobs, job) - } else if isFinished && finishedStatus == batchv1.JobFailed { - failedJobs = append(failedJobs, job) - } - } - - if cj.Spec.SuccessfulJobsHistoryLimit != nil { - removeOldestJobs(cj, - successfulJobs, - jc, - *cj.Spec.SuccessfulJobsHistoryLimit, - recorder) - } - - if cj.Spec.FailedJobsHistoryLimit != nil { - removeOldestJobs(cj, - failedJobs, - jc, - *cj.Spec.FailedJobsHistoryLimit, - recorder) - } - - // Update the CronJob, in case jobs were removed from the list. - if _, err := cjc.UpdateStatus(cj); err != nil { - nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) - klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) - } -} - -// removeOldestJobs removes the oldest jobs from a list of jobs -func removeOldestJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) { - numToDelete := len(js) - int(maxJobs) - if numToDelete <= 0 { - return - } - - nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) - klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog) - - sort.Sort(byJobStartTime(js)) - for i := 0; i < numToDelete; i++ { - klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog) - deleteJob(cj, &js[i], jc, recorder) - } -} - -// syncOne reconciles a CronJob with a list of any Jobs that it created. -// All known jobs created by "cj" should be included in "js". -// The current time is passed in to facilitate testing. -// It has no receiver, to facilitate testing. -func syncOne(cj *batchv1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) { - nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) - - childrenJobs := make(map[types.UID]bool) - for _, j := range js { - childrenJobs[j.ObjectMeta.UID] = true - found := inActiveList(*cj, j.ObjectMeta.UID) - if !found && !IsJobFinished(&j) { - recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) - // We found an unfinished job that has us as the parent, but it is not in our Active list. - // This could happen if we crashed right after creating the Job and before updating the status, - // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created - // a job that they wanted us to adopt. - - // TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't - // stop users from creating jobs if they have permission. It is assumed that if a - // user has permission to create a job within a namespace, then they have permission to make any cronJob - // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. - // TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about? - } else if found && IsJobFinished(&j) { - _, status := getFinishedStatus(&j) - deleteFromActiveList(cj, j.ObjectMeta.UID) - recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) - } - } - - // Remove any job reference from the active list if the corresponding job does not exist any more. - // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching - // job running. - for _, j := range cj.Status.Active { - if found := childrenJobs[j.UID]; !found { - recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) - deleteFromActiveList(cj, j.UID) - } - } - - updatedCJ, err := cjc.UpdateStatus(cj) - if err != nil { - klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) - return - } - *cj = *updatedCJ - - if cj.DeletionTimestamp != nil { - // The CronJob is being deleted. - // Don't do anything other than updating status. - return - } - - if cj.Spec.Suspend != nil && *cj.Spec.Suspend { - klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) - return - } - - times, err := getRecentUnmetScheduleTimes(*cj, now) - if err != nil { - recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) - klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) - return - } - // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed. - if len(times) == 0 { - klog.V(4).Infof("No unmet start times for %s", nameForLog) - return - } - if len(times) > 1 { - klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) - } - - scheduledTime := times[len(times)-1] - tooLate := false - if cj.Spec.StartingDeadlineSeconds != nil { - tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now) - } - if tooLate { - klog.V(4).Infof("Missed starting window for %s", nameForLog) - recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z)) - // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing - // the miss every cycle. In order to avoid sending multiple events, and to avoid processing - // the cj again and again, we could set a Status.LastMissedTime when we notice a miss. - // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp, - // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate - // and event the next time we process it, and also so the user looking at the status - // can see easily that there was a missed execution. - return - } - if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 { - // Regardless which source of information we use for the set of active jobs, - // there is some risk that we won't see an active job when there is one. - // (because we haven't seen the status update to the SJ or the created pod). - // So it is theoretically possible to have concurrency with Forbid. - // As long the as the invocations are "far enough apart in time", this usually won't happen. - // - // TODO: for Forbid, we could use the same name for every execution, as a lock. - // With replace, we could use a name that is deterministic per execution time. - // But that would mean that you could not inspect prior successes or failures of Forbid jobs. - klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) - return - } - if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent { - for _, j := range cj.Status.Active { - klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) - - job, err := jc.GetJob(j.Namespace, j.Name) - if err != nil { - recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) - return - } - if !deleteJob(cj, job, jc, recorder) { - return - } - } - } - - jobReq, err := getJobFromTemplate(cj, scheduledTime) - if err != nil { - klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) - return - } - jobResp, err := jc.CreateJob(cj.Namespace, jobReq) - if err != nil { - // If the namespace is being torn down, we can safely ignore - // this error since all subsequent creations will fail. - if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { - recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) - } - return - } - klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) - recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) - - // ------------------------------------------------------------------ // - - // If this process restarts at this point (after posting a job, but - // before updating the status), then we might try to start the job on - // the next time. Actually, if we re-list the SJs and Jobs on the next - // iteration of syncAll, we might not see our own status update, and - // then post one again. So, we need to use the job name as a lock to - // prevent us from making the job twice (name the job with hash of its - // scheduled time). - - // Add the just-started job to the status list. - ref, err := getRef(jobResp) - if err != nil { - klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) - } else { - cj.Status.Active = append(cj.Status.Active, *ref) - } - cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} - if _, err := cjc.UpdateStatus(cj); err != nil { - klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) - } - - return -} - -// deleteJob reaps a job, deleting the job, the pods and the reference in the active list -func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { - nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) - - // delete the job itself... - if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { - recorder.Eventf(cj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) - klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) - return false - } - // ... and its reference from active list - deleteFromActiveList(cj, job.ObjectMeta.UID) - recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name) - - return true -} - -func getRef(object runtime.Object) (*v1.ObjectReference, error) { - return ref.GetReference(scheme.Scheme, object) -} diff --git a/pkg/controller/cronjob/cronjob_controller_test.go b/pkg/controller/cronjob/cronjob_controller_test.go deleted file mode 100644 index 29dac2b6063..00000000000 --- a/pkg/controller/cronjob/cronjob_controller_test.go +++ /dev/null @@ -1,786 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cronjob - -import ( - "strconv" - "strings" - "testing" - "time" - - batchv1 "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/record" - // For the cronjob controller to do conversions. - _ "k8s.io/kubernetes/pkg/apis/batch/install" - _ "k8s.io/kubernetes/pkg/apis/core/install" -) - -var ( - // schedule is hourly on the hour - onTheHour = "0 * * * ?" - errorSchedule = "obvious error schedule" -) - -func justBeforeTheHour() time.Time { - T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z") - if err != nil { - panic("test setup error") - } - return T1 -} - -func topOfTheHour() *time.Time { - T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") - if err != nil { - panic("test setup error") - } - return &T1 -} - -func deltaTimeAfterTopOfTheHour(duration time.Duration) *time.Time { - T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") - if err != nil { - panic("test setup error") - } - t := T1.Add(duration) - return &t -} - -func justAfterTheHour() *time.Time { - T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z") - if err != nil { - panic("test setup error") - } - return &T1 -} - -func weekAfterTheHour() time.Time { - T1, err := time.Parse(time.RFC3339, "2016-05-26T10:00:00Z") - if err != nil { - panic("test setup error") - } - return T1 -} - -func justBeforeThePriorHour() time.Time { - T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z") - if err != nil { - panic("test setup error") - } - return T1 -} - -func justAfterThePriorHour() time.Time { - T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z") - if err != nil { - panic("test setup error") - } - return T1 -} - -func startTimeStringToTime(startTime string) time.Time { - T1, err := time.Parse(time.RFC3339, startTime) - if err != nil { - panic("test setup error") - } - return T1 -} - -// returns a cronJob with some fields filled in. -func cronJob() batchv1.CronJob { - return batchv1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mycronjob", - Namespace: "snazzycats", - UID: types.UID("1a2b3c"), - CreationTimestamp: metav1.Time{Time: justBeforeTheHour()}, - }, - Spec: batchv1.CronJobSpec{ - Schedule: "* * * * ?", - ConcurrencyPolicy: batchv1.AllowConcurrent, - JobTemplate: batchv1.JobTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a": "b"}, - Annotations: map[string]string{"x": "y"}, - }, - Spec: jobSpec(), - }, - }, - } -} - -func jobSpec() batchv1.JobSpec { - one := int32(1) - return batchv1.JobSpec{ - Parallelism: &one, - Completions: &one, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - {Image: "foo/bar"}, - }, - }, - }, - } -} - -func newJob(UID string) batchv1.Job { - return batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - UID: types.UID(UID), - Name: "foobar", - Namespace: metav1.NamespaceDefault, - }, - Spec: jobSpec(), - } -} - -var ( - shortDead int64 = 10 - mediumDead int64 = 2 * 60 * 60 - longDead int64 = 1000000 - noDead int64 = -12345 - A = batchv1.AllowConcurrent - f = batchv1.ForbidConcurrent - R = batchv1.ReplaceConcurrent - T = true - F = false -) - -func TestSyncOne_RunOrNot(t *testing.T) { - // Check expectations on deadline parameters - if shortDead/60/60 >= 1 { - t.Errorf("shortDead should be less than one hour") - } - - if mediumDead/60/60 < 1 || mediumDead/60/60 >= 24 { - t.Errorf("mediumDead should be between one hour and one day") - } - - if longDead/60/60/24 < 10 { - t.Errorf("longDead should be at least ten days") - } - - testCases := map[string]struct { - // cj spec - concurrencyPolicy batchv1.ConcurrencyPolicy - suspend bool - schedule string - deadline int64 - - // cj status - ranPreviously bool - stillActive bool - - // environment - now time.Time - - // expectations - expectCreate bool - expectDelete bool - expectActive int - expectedWarnings int - }{ - "never ran, not valid schedule, A": {A, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1}, - "never ran, not valid schedule, F": {f, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1}, - "never ran, not valid schedule, R": {f, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1}, - "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0}, - "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0}, - "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0}, - "never ran, is time, A": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0}, - "never ran, is time, F": {f, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0}, - "never ran, is time, R": {R, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0}, - "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, 0, 0}, - "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, *justAfterTheHour(), F, F, 0, 0}, - "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, *justAfterTheHour(), T, F, 1, 0}, - - "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0}, - "prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0}, - "prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0}, - "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0}, - "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0}, - "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0}, - "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, 0, 0}, - "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), F, F, 0, 0}, - "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), T, F, 1, 0}, - - "still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0}, - "still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0}, - "still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0}, - "still active, is time, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, 2, 0}, - "still active, is time, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, 1, 0}, - "still active, is time, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, T, 1, 0}, - "still active, is time, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, 1, 0}, - "still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), F, F, 1, 0}, - "still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), T, F, 2, 0}, - - // Controller should fail to schedule these, as there are too many missed starting times - // and either no deadline or a too long deadline. - "prev ran but done, long overdue, not past deadline, A": {A, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1}, - "prev ran but done, long overdue, not past deadline, R": {R, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1}, - "prev ran but done, long overdue, not past deadline, F": {f, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1}, - "prev ran but done, long overdue, no deadline, A": {A, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1}, - "prev ran but done, long overdue, no deadline, R": {R, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1}, - "prev ran but done, long overdue, no deadline, F": {f, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1}, - - "prev ran but done, long overdue, past medium deadline, A": {A, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0}, - "prev ran but done, long overdue, past short deadline, A": {A, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0}, - - "prev ran but done, long overdue, past medium deadline, R": {R, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0}, - "prev ran but done, long overdue, past short deadline, R": {R, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0}, - - "prev ran but done, long overdue, past medium deadline, F": {f, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0}, - "prev ran but done, long overdue, past short deadline, F": {f, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0}, - } - for name, tc := range testCases { - // Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines. - name := name - tc := tc - t.Run(name, func(t *testing.T) { - cj := cronJob() - cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy - cj.Spec.Suspend = &tc.suspend - cj.Spec.Schedule = tc.schedule - if tc.deadline != noDead { - cj.Spec.StartingDeadlineSeconds = &tc.deadline - } - - var ( - job *batchv1.Job - err error - ) - js := []batchv1.Job{} - if tc.ranPreviously { - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()} - cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()} - job, err = getJobFromTemplate(&cj, cj.Status.LastScheduleTime.Time) - if err != nil { - t.Fatalf("%s: unexpected error creating a job from template: %v", name, err) - } - job.UID = "1234" - job.Namespace = "" - if tc.stillActive { - cj.Status.Active = []v1.ObjectReference{{UID: job.UID}} - js = append(js, *job) - } - } else { - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} - if tc.stillActive { - t.Errorf("%s: test setup error: this case makes no sense", name) - } - } - - jc := &fakeJobControl{Job: job} - cjc := &fakeCJControl{} - recorder := record.NewFakeRecorder(10) - - syncOne(&cj, js, tc.now, jc, cjc, recorder) - expectedCreates := 0 - if tc.expectCreate { - expectedCreates = 1 - } - if len(jc.Jobs) != expectedCreates { - t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs)) - } - for i := range jc.Jobs { - job := &jc.Jobs[i] - controllerRef := metav1.GetControllerOf(job) - if controllerRef == nil { - t.Errorf("%s: expected job to have ControllerRef: %#v", name, job) - } else { - if got, want := controllerRef.APIVersion, "batch/v1"; got != want { - t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want) - } - if got, want := controllerRef.Kind, "CronJob"; got != want { - t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want) - } - if got, want := controllerRef.Name, cj.Name; got != want { - t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want) - } - if got, want := controllerRef.UID, cj.UID; got != want { - t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want) - } - if controllerRef.Controller == nil || *controllerRef.Controller != true { - t.Errorf("%s: controllerRef.Controller is not set to true", name) - } - } - } - - expectedDeletes := 0 - if tc.expectDelete { - expectedDeletes = 1 - } - if len(jc.DeleteJobName) != expectedDeletes { - t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName)) - } - - // Status update happens once when ranging through job list, and another one if create jobs. - expectUpdates := 1 - expectedEvents := 0 - if tc.expectCreate { - expectedEvents++ - expectUpdates++ - } - if tc.expectDelete { - expectedEvents++ - } - expectedEvents += tc.expectedWarnings - - if len(recorder.Events) != expectedEvents { - t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events)) - } - - numWarnings := 0 - for i := 1; i <= len(recorder.Events); i++ { - e := <-recorder.Events - if strings.HasPrefix(e, v1.EventTypeWarning) { - numWarnings++ - } - } - if numWarnings != tc.expectedWarnings { - t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings) - } - - if tc.expectActive != len(cjc.Updates[expectUpdates-1].Status.Active) { - t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(cjc.Updates[expectUpdates-1].Status.Active)) - } - }) - } -} - -type CleanupJobSpec struct { - StartTime string - IsFinished bool - IsSuccessful bool - ExpectDelete bool - IsStillInActiveList bool // only when IsFinished is set -} - -func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) { - limitThree := int32(3) - limitTwo := int32(2) - limitOne := int32(1) - limitZero := int32(0) - - // Starting times are assumed to be sorted by increasing start time - // in all the test cases - testCases := map[string]struct { - jobSpecs []CleanupJobSpec - now time.Time - successfulJobsHistoryLimit *int32 - failedJobsHistoryLimit *int32 - expectActive int - }{ - "success. job limit reached": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, T, F}, - {"2016-05-19T05:00:00Z", T, T, T, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", F, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), &limitTwo, &limitOne, 1}, - - "success. jobs not processed by Sync yet": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, T, F}, - {"2016-05-19T05:00:00Z", T, T, T, T}, - {"2016-05-19T06:00:00Z", T, T, F, T}, - {"2016-05-19T07:00:00Z", T, T, F, T}, - {"2016-05-19T08:00:00Z", F, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, T}, - }, justBeforeTheHour(), &limitTwo, &limitOne, 4}, - - "failed job limit reached": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, F, T, F}, - {"2016-05-19T05:00:00Z", T, F, T, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", T, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), &limitTwo, &limitTwo, 0}, - - "success. job limit set to zero": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, T, F}, - {"2016-05-19T05:00:00Z", T, F, T, F}, - {"2016-05-19T06:00:00Z", T, T, T, F}, - {"2016-05-19T07:00:00Z", T, T, T, F}, - {"2016-05-19T08:00:00Z", F, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), &limitZero, &limitOne, 1}, - - "failed job limit set to zero": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, F, F}, - {"2016-05-19T05:00:00Z", T, F, T, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", F, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, T, F}, - }, justBeforeTheHour(), &limitThree, &limitZero, 1}, - - "no limits reached": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, F, F}, - {"2016-05-19T05:00:00Z", T, F, F, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", T, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), &limitThree, &limitThree, 0}, - - // This test case should trigger the short-circuit - "limits disabled": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, F, F}, - {"2016-05-19T05:00:00Z", T, F, F, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", T, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), nil, nil, 0}, - - "success limit disabled": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, F, F}, - {"2016-05-19T05:00:00Z", T, F, F, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", T, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), nil, &limitThree, 0}, - - "failure limit disabled": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, T, F, F}, - {"2016-05-19T05:00:00Z", T, F, F, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", T, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), &limitThree, nil, 0}, - - "no limits reached because still active": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", F, F, F, F}, - {"2016-05-19T05:00:00Z", F, F, F, F}, - {"2016-05-19T06:00:00Z", F, F, F, F}, - {"2016-05-19T07:00:00Z", F, F, F, F}, - {"2016-05-19T08:00:00Z", F, F, F, F}, - {"2016-05-19T09:00:00Z", F, F, F, F}, - }, justBeforeTheHour(), &limitZero, &limitZero, 6}, - } - - for name, tc := range testCases { - // Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines. - name := name - tc := tc - t.Run(name, func(t *testing.T) { - cj := cronJob() - suspend := false - cj.Spec.ConcurrencyPolicy = f - cj.Spec.Suspend = &suspend - cj.Spec.Schedule = onTheHour - - cj.Spec.SuccessfulJobsHistoryLimit = tc.successfulJobsHistoryLimit - cj.Spec.FailedJobsHistoryLimit = tc.failedJobsHistoryLimit - - var ( - job *batchv1.Job - err error - ) - - // Set consistent timestamps for the CronJob - if len(tc.jobSpecs) != 0 { - firstTime := startTimeStringToTime(tc.jobSpecs[0].StartTime) - lastTime := startTimeStringToTime(tc.jobSpecs[len(tc.jobSpecs)-1].StartTime) - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: firstTime} - cj.Status.LastScheduleTime = &metav1.Time{Time: lastTime} - } else { - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} - } - - // Create jobs - js := []batchv1.Job{} - jobsToDelete := sets.NewString() - cj.Status.Active = []v1.ObjectReference{} - - for i, spec := range tc.jobSpecs { - job, err = getJobFromTemplate(&cj, startTimeStringToTime(spec.StartTime)) - if err != nil { - t.Fatalf("%s: unexpected error creating a job from template: %v", name, err) - } - - job.UID = types.UID(strconv.Itoa(i)) - job.Namespace = "" - - if spec.IsFinished { - var conditionType batchv1.JobConditionType - if spec.IsSuccessful { - conditionType = batchv1.JobComplete - } else { - conditionType = batchv1.JobFailed - } - condition := batchv1.JobCondition{Type: conditionType, Status: v1.ConditionTrue} - job.Status.Conditions = append(job.Status.Conditions, condition) - - if spec.IsStillInActiveList { - cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID}) - } - } else { - if spec.IsSuccessful || spec.IsStillInActiveList { - t.Errorf("%s: test setup error: this case makes no sense", name) - } - cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID}) - } - - js = append(js, *job) - if spec.ExpectDelete { - jobsToDelete.Insert(job.Name) - } - } - - jc := &fakeJobControl{Job: job} - cjc := &fakeCJControl{} - recorder := record.NewFakeRecorder(10) - - cleanupFinishedJobs(&cj, js, jc, cjc, recorder) - - // Check we have actually deleted the correct jobs - if len(jc.DeleteJobName) != len(jobsToDelete) { - t.Errorf("%s: expected %d job deleted, actually %d", name, len(jobsToDelete), len(jc.DeleteJobName)) - } else { - jcDeleteJobName := sets.NewString(jc.DeleteJobName...) - if !jcDeleteJobName.Equal(jobsToDelete) { - t.Errorf("%s: expected jobs: %v deleted, actually: %v deleted", name, jobsToDelete, jcDeleteJobName) - } - } - - // Check for events - expectedEvents := len(jobsToDelete) - if name == "failed list pod err" { - expectedEvents = len(tc.jobSpecs) - } - if len(recorder.Events) != expectedEvents { - t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events)) - } - - // Check for jobs still in active list - numActive := 0 - if len(cjc.Updates) != 0 { - numActive = len(cjc.Updates[len(cjc.Updates)-1].Status.Active) - } - if tc.expectActive != numActive { - t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, numActive) - } - }) - } -} - -// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs, -// but over time, all jobs run as expected (assuming Allow and no deadline). - -// TestSyncOne_Status tests cj.UpdateStatus in syncOne -func TestSyncOne_Status(t *testing.T) { - finishedJob := newJob("1") - finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batchv1.JobCondition{Type: batchv1.JobComplete, Status: v1.ConditionTrue}) - unexpectedJob := newJob("2") - missingJob := newJob("3") - - testCases := map[string]struct { - // cj spec - concurrencyPolicy batchv1.ConcurrencyPolicy - suspend bool - schedule string - deadline int64 - - // cj status - ranPreviously bool - hasFinishedJob bool - - // environment - now time.Time - hasUnexpectedJob bool - hasMissingJob bool - beingDeleted bool - - // expectations - expectCreate bool - expectDelete bool - }{ - "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F}, - "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F}, - "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F}, - "never ran, is time, A": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F}, - "never ran, is time, F": {f, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F}, - "never ran, is time, R": {R, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F}, - "never ran, is time, deleting": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, T, F, F}, - "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, F, F}, - "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, *justAfterTheHour(), F, F, F, F, F}, - "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, *justAfterTheHour(), F, F, F, T, F}, - - "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F, F}, - "prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F}, - "prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F}, - "prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F}, - "prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F, F}, - "prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F, F}, - "prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F}, - "prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F, F}, - "prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F}, - "prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F}, - "prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F}, - "prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F}, - - "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F}, - "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F}, - "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F}, - "prev ran but done, is time, deleting": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, T, F, F}, - "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, F, F}, - "prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, F, F}, - "prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, F, F}, - "prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, F, F}, - "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), F, F, F, F, F}, - "prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), F, F, F, F, F}, - "prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), T, F, F, F, F}, - "prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), T, F, F, F, F}, - "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), T, F, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), T, F, F, T, F}, - } - - for name, tc := range testCases { - // Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines. - name := name - tc := tc - t.Run(name, func(t *testing.T) { - // Setup the test - cj := cronJob() - cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy - cj.Spec.Suspend = &tc.suspend - cj.Spec.Schedule = tc.schedule - if tc.deadline != noDead { - cj.Spec.StartingDeadlineSeconds = &tc.deadline - } - if tc.ranPreviously { - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()} - cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()} - } else { - if tc.hasFinishedJob || tc.hasUnexpectedJob || tc.hasMissingJob { - t.Errorf("%s: test setup error: this case makes no sense", name) - } - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} - } - jobs := []batchv1.Job{} - if tc.hasFinishedJob { - ref, err := getRef(&finishedJob) - if err != nil { - t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err) - } - cj.Status.Active = []v1.ObjectReference{*ref} - jobs = append(jobs, finishedJob) - } - if tc.hasUnexpectedJob { - jobs = append(jobs, unexpectedJob) - } - if tc.hasMissingJob { - ref, err := getRef(&missingJob) - if err != nil { - t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err) - } - cj.Status.Active = append(cj.Status.Active, *ref) - } - if tc.beingDeleted { - timestamp := metav1.NewTime(tc.now) - cj.DeletionTimestamp = ×tamp - } - - jc := &fakeJobControl{} - cjc := &fakeCJControl{} - recorder := record.NewFakeRecorder(10) - - // Run the code - syncOne(&cj, jobs, tc.now, jc, cjc, recorder) - - // Status update happens once when ranging through job list, and another one if create jobs. - expectUpdates := 1 - // Events happens when there's unexpected / finished jobs, and upon job creation / deletion. - expectedEvents := 0 - if tc.expectCreate { - expectUpdates++ - expectedEvents++ - } - if tc.expectDelete { - expectedEvents++ - } - if tc.hasFinishedJob { - expectedEvents++ - } - if tc.hasUnexpectedJob { - expectedEvents++ - } - if tc.hasMissingJob { - expectedEvents++ - } - - if len(recorder.Events) != expectedEvents { - t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events) - } - - if expectUpdates != len(cjc.Updates) { - t.Errorf("%s: expected %d status updates, actually %d", name, expectUpdates, len(cjc.Updates)) - } - - if tc.hasFinishedJob && inActiveList(cjc.Updates[0], finishedJob.UID) { - t.Errorf("%s: expected finished job removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active) - } - - if tc.hasUnexpectedJob && inActiveList(cjc.Updates[0], unexpectedJob.UID) { - t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, cjc.Updates[0].Status.Active) - } - - if tc.hasMissingJob && inActiveList(cjc.Updates[0], missingJob.UID) { - t.Errorf("%s: expected missing job to be removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active) - } - - if tc.expectCreate && !cjc.Updates[1].Status.LastScheduleTime.Time.Equal(*topOfTheHour()) { - t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), cjc.Updates[1].Status.LastScheduleTime) - } - }) - } -} diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index 5f8e13c5623..fba64961847 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -39,6 +40,7 @@ import ( batchv1listers "k8s.io/client-go/listers/batch/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + ref "k8s.io/client-go/tools/reference" "k8s.io/client-go/util/workqueue" "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog/v2" @@ -47,6 +49,9 @@ import ( ) var ( + // controllerKind contains the schema.GroupVersionKind for this controller type. + controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob") + nextScheduleDelta = 100 * time.Millisecond ) @@ -694,3 +699,24 @@ func isJobInActiveList(job *batchv1.Job, activeJobs []corev1.ObjectReference) bo } return false } + +// deleteJob reaps a job, deleting the job, the pods and the reference in the active list +func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { + nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) + + // delete the job itself... + if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { + recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) + klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) + return false + } + // ... and its reference from active list + deleteFromActiveList(cj, job.ObjectMeta.UID) + recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name) + + return true +} + +func getRef(object runtime.Object) (*corev1.ObjectReference, error) { + return ref.GetReference(scheme.Scheme, object) +} diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go index 1735d658c44..458992ad4c0 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2_test.go +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" @@ -39,6 +40,66 @@ import ( "k8s.io/kubernetes/pkg/controller" ) +var ( + shortDead int64 = 10 + mediumDead int64 = 2 * 60 * 60 + longDead int64 = 1000000 + noDead int64 = -12345 + + errorSchedule = "obvious error schedule" + // schedule is hourly on the hour + onTheHour = "0 * * * ?" + + A = batchv1.AllowConcurrent + f = batchv1.ForbidConcurrent + R = batchv1.ReplaceConcurrent + T = true + F = false +) + +// returns a cronJob with some fields filled in. +func cronJob() batchv1.CronJob { + return batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mycronjob", + Namespace: "snazzycats", + UID: types.UID("1a2b3c"), + CreationTimestamp: metav1.Time{Time: justBeforeTheHour()}, + }, + Spec: batchv1.CronJobSpec{ + Schedule: "* * * * ?", + ConcurrencyPolicy: batchv1.AllowConcurrent, + JobTemplate: batchv1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: jobSpec(), + }, + }, + } +} + +func jobSpec() batchv1.JobSpec { + one := int32(1) + return batchv1.JobSpec{ + Parallelism: &one, + Completions: &one, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Image: "foo/bar"}, + }, + }, + }, + } +} + func justASecondBeforeTheHour() time.Time { T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:59Z") if err != nil { @@ -47,6 +108,46 @@ func justASecondBeforeTheHour() time.Time { return T1 } +func justAfterThePriorHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func justBeforeThePriorHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func justAfterTheHour() *time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z") + if err != nil { + panic("test setup error") + } + return &T1 +} + +func justBeforeTheHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func weekAfterTheHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-26T10:00:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + func TestControllerV2SyncCronJob(t *testing.T) { // Check expectations on deadline parameters if shortDead/60/60 >= 1 { diff --git a/pkg/controller/cronjob/injection.go b/pkg/controller/cronjob/injection.go index 34a5cf50ab3..35bd0f3a5c7 100644 --- a/pkg/controller/cronjob/injection.go +++ b/pkg/controller/cronjob/injection.go @@ -22,10 +22,8 @@ import ( "sync" batchv1 "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" @@ -104,22 +102,6 @@ type realJobControl struct { var _ jobControlInterface = &realJobControl{} -func copyLabels(template *batchv1.JobTemplateSpec) labels.Set { - l := make(labels.Set) - for k, v := range template.Labels { - l[k] = v - } - return l -} - -func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set { - a := make(labels.Set) - for k, v := range template.Annotations { - a[k] = v - } - return a -} - func (r realJobControl) GetJob(namespace, name string) (*batchv1.Job, error) { return r.KubeClient.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } @@ -215,59 +197,3 @@ func (f *fakeJobControl) Clear() { f.Jobs = []batchv1.Job{} f.Err = nil } - -// ------------------------------------------------------------------ // - -// podControlInterface is an interface that knows how to list or delete pods -// created as an interface to allow testing. -type podControlInterface interface { - // ListPods list pods - ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) - // DeleteJob deletes the pod identified by name. - // TODO: delete by UID? - DeletePod(namespace string, name string) error -} - -// realPodControl is the default implementation of podControlInterface. -type realPodControl struct { - KubeClient clientset.Interface - Recorder record.EventRecorder -} - -var _ podControlInterface = &realPodControl{} - -func (r realPodControl) ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) { - return r.KubeClient.CoreV1().Pods(namespace).List(context.TODO(), opts) -} - -func (r realPodControl) DeletePod(namespace string, name string) error { - return r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) -} - -type fakePodControl struct { - sync.Mutex - Pods []v1.Pod - DeletePodName []string - Err error -} - -var _ podControlInterface = &fakePodControl{} - -func (f *fakePodControl) ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) { - f.Lock() - defer f.Unlock() - if f.Err != nil { - return nil, f.Err - } - return &v1.PodList{Items: f.Pods}, nil -} - -func (f *fakePodControl) DeletePod(namespace string, name string) error { - f.Lock() - defer f.Unlock() - if f.Err != nil { - return f.Err - } - f.DeletePodName = append(f.DeletePodName, name) - return nil -} diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index be15ebff8b2..bd36768128b 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -21,13 +21,14 @@ import ( "time" "github.com/robfig/cron/v3" - "k8s.io/klog/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" ) // Utilities for dealing with Jobs and CronJobs and time. @@ -56,99 +57,6 @@ func deleteFromActiveList(cj *batchv1.CronJob, uid types.UID) { cj.Status.Active = newActive } -// getParentUIDFromJob extracts UID of job's parent and whether it was found -func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) { - controllerRef := metav1.GetControllerOf(&j) - - if controllerRef == nil { - return types.UID(""), false - } - - if controllerRef.Kind != "CronJob" { - klog.V(4).Infof("Job with non-CronJob parent, name %s namespace %s", j.Name, j.Namespace) - return types.UID(""), false - } - - return controllerRef.UID, true -} - -// groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. cronJob). -// It has no receiver, to facilitate testing. -func groupJobsByParent(js []batchv1.Job) map[types.UID][]batchv1.Job { - jobsByCj := make(map[types.UID][]batchv1.Job) - for _, job := range js { - parentUID, found := getParentUIDFromJob(job) - if !found { - klog.V(4).Infof("Unable to get parent uid from job %s in namespace %s", job.Name, job.Namespace) - continue - } - jobsByCj[parentUID] = append(jobsByCj[parentUID], job) - } - return jobsByCj -} - -// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not. -// -// If there are too many (>100) unstarted times, just give up and return an empty slice. -// If there were missed times prior to the last known start time, then those are not returned. -func getRecentUnmetScheduleTimes(cj batchv1.CronJob, now time.Time) ([]time.Time, error) { - starts := []time.Time{} - sched, err := cron.ParseStandard(cj.Spec.Schedule) - if err != nil { - return starts, fmt.Errorf("unparseable schedule: %s : %s", cj.Spec.Schedule, err) - } - - var earliestTime time.Time - if cj.Status.LastScheduleTime != nil { - earliestTime = cj.Status.LastScheduleTime.Time - } else { - // If none found, then this is either a recently created cronJob, - // or the active/completed info was somehow lost (contract for status - // in kubernetes says it may need to be recreated), or that we have - // started a job, but have not noticed it yet (distributed systems can - // have arbitrary delays). In any case, use the creation time of the - // CronJob as last known start time. - earliestTime = cj.ObjectMeta.CreationTimestamp.Time - } - if cj.Spec.StartingDeadlineSeconds != nil { - // Controller is not going to schedule anything below this point - schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)) - - if schedulingDeadline.After(earliestTime) { - earliestTime = schedulingDeadline - } - } - if earliestTime.After(now) { - return []time.Time{}, nil - } - - for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) { - starts = append(starts, t) - // An object might miss several starts. For example, if - // controller gets wedged on friday at 5:01pm when everyone has - // gone home, and someone comes in on tuesday AM and discovers - // the problem and restarts the controller, then all the hourly - // jobs, more than 80 of them for one hourly cronJob, should - // all start running with no further intervention (if the cronJob - // allows concurrency and late starts). - // - // However, if there is a bug somewhere, or incorrect clock - // on controller's server or apiservers (for setting creationTimestamp) - // then there could be so many missed start times (it could be off - // by decades or more), that it would eat up all the CPU and memory - // of this controller. In that case, we want to not try to list - // all the missed start times. - // - // I've somewhat arbitrarily picked 100, as more than 80, - // but less than "lots". - if len(starts) > 100 { - // We can't get the most recent times so just return an empty slice - return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew") - } - } - return starts, nil -} - // getNextScheduleTime gets the time of next schedule after last scheduled and before now // it returns nil if no unmet schedule times. // If there are too many (>100) unstarted times, it will raise a warning and but still return @@ -232,28 +140,20 @@ func getMostRecentScheduleTime(earliestTime time.Time, now time.Time, schedule c return &t, numberOfMissedSchedules, nil } -// getJobFromTemplate makes a Job from a CronJob -func getJobFromTemplate(cj *batchv1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) { - labels := copyLabels(&cj.Spec.JobTemplate) - annotations := copyAnnotations(&cj.Spec.JobTemplate) - // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice - name := fmt.Sprintf("%s-%d", cj.Name, getTimeHash(scheduledTime)) - - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Labels: labels, - Annotations: annotations, - Name: name, - OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)}, - }, +func copyLabels(template *batchv1.JobTemplateSpec) labels.Set { + l := make(labels.Set) + for k, v := range template.Labels { + l[k] = v } - cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec) - return job, nil + return l } -// getTimeHash returns Unix Epoch Time -func getTimeHash(scheduledTime time.Time) int64 { - return scheduledTime.Unix() +func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set { + a := make(labels.Set) + for k, v := range template.Annotations { + a[k] = v + } + return a } // getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index 8f27c4fcb2f..863a5d290bb 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -29,67 +29,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" - utilpointer "k8s.io/utils/pointer" ) -func TestGetJobFromTemplate(t *testing.T) { - // getJobFromTemplate() needs to take the job template and copy the labels and annotations - // and other fields, and add a created-by reference. - - var one int64 = 1 - var no bool - - cj := batchv1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mycronjob", - Namespace: "snazzycats", - UID: types.UID("1a2b3c"), - SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob", - }, - Spec: batchv1.CronJobSpec{ - Schedule: "* * * * ?", - ConcurrencyPolicy: batchv1.AllowConcurrent, - JobTemplate: batchv1.JobTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a": "b"}, - Annotations: map[string]string{"x": "y"}, - }, - Spec: batchv1.JobSpec{ - ActiveDeadlineSeconds: &one, - ManualSelector: &no, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - {Image: "foo/bar"}, - }, - }, - }, - }, - }, - }, - } - - var job *batchv1.Job - job, err := getJobFromTemplate(&cj, time.Time{}) - if err != nil { - t.Errorf("Did not expect error: %s", err) - } - if !strings.HasPrefix(job.ObjectMeta.Name, "mycronjob-") { - t.Errorf("Wrong Name") - } - if len(job.ObjectMeta.Labels) != 1 { - t.Errorf("Wrong number of labels") - } - if len(job.ObjectMeta.Annotations) != 1 { - t.Errorf("Wrong number of annotations") - } -} - func TestGetJobFromTemplate2(t *testing.T) { // getJobFromTemplate2() needs to take the job template and copy the labels and annotations // and other fields, and add a created-by reference. @@ -148,159 +89,6 @@ func TestGetJobFromTemplate2(t *testing.T) { } } -func TestGetParentUIDFromJob(t *testing.T) { - j := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foobar", - Namespace: metav1.NamespaceDefault, - }, - Spec: batchv1.JobSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"foo": "bar"}, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - {Image: "foo/bar"}, - }, - }, - }, - }, - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{{ - Type: batchv1.JobComplete, - Status: v1.ConditionTrue, - }}, - }, - } - { - // Case 1: No ControllerRef - _, found := getParentUIDFromJob(*j) - - if found { - t.Errorf("Unexpectedly found uid") - } - } - { - // Case 2: Has ControllerRef - j.ObjectMeta.SetOwnerReferences([]metav1.OwnerReference{ - { - Kind: "CronJob", - UID: types.UID("5ef034e0-1890-11e6-8935-42010af0003e"), - Controller: utilpointer.BoolPtr(true), - }, - }) - - expectedUID := types.UID("5ef034e0-1890-11e6-8935-42010af0003e") - - uid, found := getParentUIDFromJob(*j) - if !found { - t.Errorf("Unexpectedly did not find uid") - } else if uid != expectedUID { - t.Errorf("Wrong UID: %v", uid) - } - } - -} - -func TestGroupJobsByParent(t *testing.T) { - uid1 := types.UID("11111111-1111-1111-1111-111111111111") - uid2 := types.UID("22222222-2222-2222-2222-222222222222") - uid3 := types.UID("33333333-3333-3333-3333-333333333333") - - ownerReference1 := metav1.OwnerReference{ - Kind: "CronJob", - UID: uid1, - Controller: utilpointer.BoolPtr(true), - } - - ownerReference2 := metav1.OwnerReference{ - Kind: "CronJob", - UID: uid2, - Controller: utilpointer.BoolPtr(true), - } - - ownerReference3 := metav1.OwnerReference{ - Kind: "CronJob", - UID: uid3, - Controller: utilpointer.BoolPtr(true), - } - - { - // Case 1: There are no jobs and cronJobs - js := []batchv1.Job{} - jobsByCj := groupJobsByParent(js) - if len(jobsByCj) != 0 { - t.Errorf("Wrong number of items in map") - } - } - - { - // Case 2: there is one controller with one job it created. - js := []batchv1.Job{ - {ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}}, - } - jobsBySj := groupJobsByParent(js) - - if len(jobsBySj) != 1 { - t.Errorf("Wrong number of items in map") - } - jobList1, found := jobsBySj[uid1] - if !found { - t.Errorf("Key not found") - } - if len(jobList1) != 1 { - t.Errorf("Wrong number of items in map") - } - } - - { - // Case 3: Two namespaces, one has two jobs from one controller, other has 3 jobs from two controllers. - // There are also two jobs with no created-by annotation. - js := []batchv1.Job{ - {ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference2}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "d", Namespace: "x", OwnerReferences: []metav1.OwnerReference{}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "y", OwnerReferences: []metav1.OwnerReference{ownerReference3}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "y", OwnerReferences: []metav1.OwnerReference{ownerReference3}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "d", Namespace: "y", OwnerReferences: []metav1.OwnerReference{}}}, - } - - jobsBySj := groupJobsByParent(js) - - if len(jobsBySj) != 3 { - t.Errorf("Wrong number of items in map") - } - jobList1, found := jobsBySj[uid1] - if !found { - t.Errorf("Key not found") - } - if len(jobList1) != 2 { - t.Errorf("Wrong number of items in map") - } - jobList2, found := jobsBySj[uid2] - if !found { - t.Errorf("Key not found") - } - if len(jobList2) != 1 { - t.Errorf("Wrong number of items in map") - } - jobList3, found := jobsBySj[uid3] - if !found { - t.Errorf("Key not found") - } - if len(jobList3) != 2 { - t.Errorf("Wrong number of items in map") - } - } -} - func TestGetNextScheduleTime(t *testing.T) { // schedule is hourly on the hour schedule := "0 * * * ?" @@ -427,142 +215,6 @@ func TestGetNextScheduleTime(t *testing.T) { } } -func TestGetRecentUnmetScheduleTimes(t *testing.T) { - // schedule is hourly on the hour - schedule := "0 * * * ?" - // T1 is a scheduled start time of that schedule - T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") - if err != nil { - t.Errorf("test setup error: %v", err) - } - // T2 is a scheduled start time of that schedule after T1 - T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z") - if err != nil { - t.Errorf("test setup error: %v", err) - } - - cj := batchv1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mycronjob", - Namespace: metav1.NamespaceDefault, - UID: types.UID("1a2b3c"), - }, - Spec: batchv1.CronJobSpec{ - Schedule: schedule, - ConcurrencyPolicy: batchv1.AllowConcurrent, - JobTemplate: batchv1.JobTemplateSpec{}, - }, - } - { - // Case 1: no known start times, and none needed yet. - // Creation time is before T1. - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} - // Current time is more than creation time, but less than T1. - now := T1.Add(-7 * time.Minute) - times, err := getRecentUnmetScheduleTimes(cj, now) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(times) != 0 { - t.Errorf("expected no start times, got: %v", times) - } - } - { - // Case 2: no known start times, and one needed. - // Creation time is before T1. - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} - // Current time is after T1 - now := T1.Add(2 * time.Second) - times, err := getRecentUnmetScheduleTimes(cj, now) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(times) != 1 { - t.Errorf("expected 1 start time, got: %v", times) - } else if !times[0].Equal(T1) { - t.Errorf("expected: %v, got: %v", T1, times[0]) - } - } - { - // Case 3: known LastScheduleTime, no start needed. - // Creation time is before T1. - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} - // Status shows a start at the expected time. - cj.Status.LastScheduleTime = &metav1.Time{Time: T1} - // Current time is after T1 - now := T1.Add(2 * time.Minute) - times, err := getRecentUnmetScheduleTimes(cj, now) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(times) != 0 { - t.Errorf("expected 0 start times, got: %v", times) - } - } - { - // Case 4: known LastScheduleTime, a start needed - // Creation time is before T1. - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} - // Status shows a start at the expected time. - cj.Status.LastScheduleTime = &metav1.Time{Time: T1} - // Current time is after T1 and after T2 - now := T2.Add(5 * time.Minute) - times, err := getRecentUnmetScheduleTimes(cj, now) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(times) != 1 { - t.Errorf("expected 1 start times, got: %v", times) - } else if !times[0].Equal(T2) { - t.Errorf("expected: %v, got: %v", T1, times[0]) - } - } - { - // Case 5: known LastScheduleTime, two starts needed - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} - cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} - // Current time is after T1 and after T2 - now := T2.Add(5 * time.Minute) - times, err := getRecentUnmetScheduleTimes(cj, now) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(times) != 2 { - t.Errorf("expected 2 start times, got: %v", times) - } else { - if !times[0].Equal(T1) { - t.Errorf("expected: %v, got: %v", T1, times[0]) - } - if !times[1].Equal(T2) { - t.Errorf("expected: %v, got: %v", T2, times[1]) - } - } - } - { - // Case 6: now is way way ahead of last start time, and there is no deadline. - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} - cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} - now := T2.Add(10 * 24 * time.Hour) - _, err := getRecentUnmetScheduleTimes(cj, now) - if err == nil { - t.Errorf("expected an error") - } - } - { - // Case 7: now is way way ahead of last start time, but there is a short deadline. - cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} - cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} - now := T2.Add(10 * 24 * time.Hour) - // Deadline is short - deadline := int64(2 * 60 * 60) - cj.Spec.StartingDeadlineSeconds = &deadline - _, err := getRecentUnmetScheduleTimes(cj, now) - if err != nil { - t.Errorf("unexpected error") - } - } -} - func TestByJobStartTime(t *testing.T) { now := metav1.NewTime(time.Date(2018, time.January, 1, 2, 3, 4, 5, time.UTC)) later := metav1.NewTime(time.Date(2019, time.January, 1, 2, 3, 4, 5, time.UTC)) @@ -708,3 +360,20 @@ func TestGetMostRecentScheduleTime(t *testing.T) { }) } } + +func topOfTheHour() *time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") + if err != nil { + panic("test setup error") + } + return &T1 +} + +func deltaTimeAfterTopOfTheHour(duration time.Duration) *time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") + if err != nil { + panic("test setup error") + } + t := T1.Add(duration) + return &t +}