Merge pull request #106126 from soltysh/remove_old_cronjob

Remove old cronjob controller
This commit is contained in:
Kubernetes Prow Robot 2021-11-04 20:35:53 -07:00 committed by GitHub
commit ed42bbd722
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 157 additions and 1708 deletions

View File

@ -1,387 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cronjob
/*
I did not use watch or expectations. Those add a lot of corner cases, and we aren't
expecting a large volume of jobs or cronJobs. (We are favoring correctness
over scalability. If we find a single controller thread is too slow because
there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
If we find the load on the API server is too high, we can use a watch and
UndeltaStore.)
Just periodically list jobs and cronJobs, and then reconcile them.
*/
import (
"context"
"fmt"
"sort"
"time"
"k8s.io/klog/v2"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/pager"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
)
// Utilities for dealing with Jobs and CronJobs and time.
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
// Controller is a controller for CronJobs.
type Controller struct {
kubeClient clientset.Interface
jobControl jobControlInterface
cjControl cjControlInterface
podControl podControlInterface
recorder record.EventRecorder
}
// NewController creates and initializes a new Controller.
func NewController(kubeClient clientset.Interface) (*Controller, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
jm := &Controller{
kubeClient: kubeClient,
jobControl: realJobControl{KubeClient: kubeClient},
cjControl: &realCJControl{KubeClient: kubeClient},
podControl: &realPodControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
}
return jm, nil
}
// Run starts the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting CronJob Manager")
// Check things every 10 second.
go wait.Until(jm.syncAll, 10*time.Second, stopCh)
<-stopCh
klog.Infof("Shutting down CronJob Manager")
}
// syncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *Controller) syncAll() {
// List children (Jobs) before parents (CronJob).
// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
// Note that this only works because we are NOT using any caches here.
jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
}
js := make([]batchv1.Job, 0)
err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
jobTmp, ok := object.(*batchv1.Job)
if !ok {
return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
}
js = append(js, *jobTmp)
return nil
})
if err != nil {
utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
return
}
klog.V(4).Infof("Found %d jobs", len(js))
jobsByCj := groupJobsByParent(js)
klog.V(4).Infof("Found %d groups", len(jobsByCj))
err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1().CronJobs(metav1.NamespaceAll).List(ctx, opts)
}).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
cj, ok := object.(*batchv1.CronJob)
if !ok {
return fmt.Errorf("expected type *batchv1.CronJob, got type %T", cj)
}
syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
return nil
})
if err != nil {
utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
return
}
}
// cleanupFinishedJobs cleanups finished jobs created by a CronJob
func cleanupFinishedJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface,
cjc cjControlInterface, recorder record.EventRecorder) {
// If neither limits are active, there is no need to do anything.
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
return
}
failedJobs := []batchv1.Job{}
successfulJobs := []batchv1.Job{}
for _, job := range js {
isFinished, finishedStatus := getFinishedStatus(&job)
if isFinished && finishedStatus == batchv1.JobComplete {
successfulJobs = append(successfulJobs, job)
} else if isFinished && finishedStatus == batchv1.JobFailed {
failedJobs = append(failedJobs, job)
}
}
if cj.Spec.SuccessfulJobsHistoryLimit != nil {
removeOldestJobs(cj,
successfulJobs,
jc,
*cj.Spec.SuccessfulJobsHistoryLimit,
recorder)
}
if cj.Spec.FailedJobsHistoryLimit != nil {
removeOldestJobs(cj,
failedJobs,
jc,
*cj.Spec.FailedJobsHistoryLimit,
recorder)
}
// Update the CronJob, in case jobs were removed from the list.
if _, err := cjc.UpdateStatus(cj); err != nil {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
}
}
// removeOldestJobs removes the oldest jobs from a list of jobs
func removeOldestJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 {
return
}
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
sort.Sort(byJobStartTime(js))
for i := 0; i < numToDelete; i++ {
klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
deleteJob(cj, &js[i], jc, recorder)
}
}
// syncOne reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "cj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func syncOne(cj *batchv1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
childrenJobs := make(map[types.UID]bool)
for _, j := range js {
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(*cj, j.ObjectMeta.UID)
if !found && !IsJobFinished(&j) {
recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
// We found an unfinished job that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
// stop users from creating jobs if they have permission. It is assumed that if a
// user has permission to create a job within a namespace, then they have permission to make any cronJob
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else if found && IsJobFinished(&j) {
_, status := getFinishedStatus(&j)
deleteFromActiveList(cj, j.ObjectMeta.UID)
recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
}
}
// Remove any job reference from the active list if the corresponding job does not exist any more.
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
// job running.
for _, j := range cj.Status.Active {
if found := childrenJobs[j.UID]; !found {
recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(cj, j.UID)
}
}
updatedCJ, err := cjc.UpdateStatus(cj)
if err != nil {
klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
return
}
*cj = *updatedCJ
if cj.DeletionTimestamp != nil {
// The CronJob is being deleted.
// Don't do anything other than updating status.
return
}
if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
times, err := getRecentUnmetScheduleTimes(*cj, now)
if err != nil {
recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
return
}
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
if len(times) == 0 {
klog.V(4).Infof("No unmet start times for %s", nameForLog)
return
}
if len(times) > 1 {
klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
}
scheduledTime := times[len(times)-1]
tooLate := false
if cj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
klog.V(4).Infof("Missed starting window for %s", nameForLog)
recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
return
}
if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
// So it is theoretically possible to have concurrency with Forbid.
// As long the as the invocations are "far enough apart in time", this usually won't happen.
//
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return
}
if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, j := range cj.Status.Active {
klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil {
recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return
}
if !deleteJob(cj, job, jc, recorder) {
return
}
}
}
jobReq, err := getJobFromTemplate(cj, scheduledTime)
if err != nil {
klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
jobResp, err := jc.CreateJob(cj.Namespace, jobReq)
if err != nil {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
}
return
}
klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ //
// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we re-list the SJs and Jobs on the next
// iteration of syncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice (name the job with hash of its
// scheduled time).
// Add the just-started job to the status list.
ref, err := getRef(jobResp)
if err != nil {
klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else {
cj.Status.Active = append(cj.Status.Active, *ref)
}
cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
if _, err := cjc.UpdateStatus(cj); err != nil {
klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
}
return
}
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
// delete the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(cj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return false
}
// ... and its reference from active list
deleteFromActiveList(cj, job.ObjectMeta.UID)
recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
return true
}
func getRef(object runtime.Object) (*v1.ObjectReference, error) {
return ref.GetReference(scheme.Scheme, object)
}

View File

@ -1,786 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cronjob
import (
"strconv"
"strings"
"testing"
"time"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
// For the cronjob controller to do conversions.
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
)
var (
// schedule is hourly on the hour
onTheHour = "0 * * * ?"
errorSchedule = "obvious error schedule"
)
func justBeforeTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func topOfTheHour() *time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
panic("test setup error")
}
return &T1
}
func deltaTimeAfterTopOfTheHour(duration time.Duration) *time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
panic("test setup error")
}
t := T1.Add(duration)
return &t
}
func justAfterTheHour() *time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z")
if err != nil {
panic("test setup error")
}
return &T1
}
func weekAfterTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-26T10:00:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justBeforeThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justAfterThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func startTimeStringToTime(startTime string) time.Time {
T1, err := time.Parse(time.RFC3339, startTime)
if err != nil {
panic("test setup error")
}
return T1
}
// returns a cronJob with some fields filled in.
func cronJob() batchv1.CronJob {
return batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
CreationTimestamp: metav1.Time{Time: justBeforeTheHour()},
},
Spec: batchv1.CronJobSpec{
Schedule: "* * * * ?",
ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: jobSpec(),
},
},
}
}
func jobSpec() batchv1.JobSpec {
one := int32(1)
return batchv1.JobSpec{
Parallelism: &one,
Completions: &one,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Image: "foo/bar"},
},
},
},
}
}
func newJob(UID string) batchv1.Job {
return batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(UID),
Name: "foobar",
Namespace: metav1.NamespaceDefault,
},
Spec: jobSpec(),
}
}
var (
shortDead int64 = 10
mediumDead int64 = 2 * 60 * 60
longDead int64 = 1000000
noDead int64 = -12345
A = batchv1.AllowConcurrent
f = batchv1.ForbidConcurrent
R = batchv1.ReplaceConcurrent
T = true
F = false
)
func TestSyncOne_RunOrNot(t *testing.T) {
// Check expectations on deadline parameters
if shortDead/60/60 >= 1 {
t.Errorf("shortDead should be less than one hour")
}
if mediumDead/60/60 < 1 || mediumDead/60/60 >= 24 {
t.Errorf("mediumDead should be between one hour and one day")
}
if longDead/60/60/24 < 10 {
t.Errorf("longDead should be at least ten days")
}
testCases := map[string]struct {
// cj spec
concurrencyPolicy batchv1.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
// cj status
ranPreviously bool
stillActive bool
// environment
now time.Time
// expectations
expectCreate bool
expectDelete bool
expectActive int
expectedWarnings int
}{
"never ran, not valid schedule, A": {A, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1},
"never ran, not valid schedule, F": {f, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1},
"never ran, not valid schedule, R": {f, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1},
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, 0, 0},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, *justAfterTheHour(), F, F, 0, 0},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, *justAfterTheHour(), T, F, 1, 0},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0},
"prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0},
"prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, 0, 0},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), F, F, 0, 0},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), T, F, 1, 0},
"still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0},
"still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0},
"still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0},
"still active, is time, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, 2, 0},
"still active, is time, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, 1, 0},
"still active, is time, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, T, 1, 0},
"still active, is time, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, 1, 0},
"still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), F, F, 1, 0},
"still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), T, F, 2, 0},
// Controller should fail to schedule these, as there are too many missed starting times
// and either no deadline or a too long deadline.
"prev ran but done, long overdue, not past deadline, A": {A, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1},
"prev ran but done, long overdue, not past deadline, R": {R, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1},
"prev ran but done, long overdue, not past deadline, F": {f, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1},
"prev ran but done, long overdue, no deadline, A": {A, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1},
"prev ran but done, long overdue, no deadline, R": {R, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1},
"prev ran but done, long overdue, no deadline, F": {f, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1},
"prev ran but done, long overdue, past medium deadline, A": {A, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0},
"prev ran but done, long overdue, past short deadline, A": {A, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0},
"prev ran but done, long overdue, past medium deadline, R": {R, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0},
"prev ran but done, long overdue, past short deadline, R": {R, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0},
"prev ran but done, long overdue, past medium deadline, F": {f, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0},
"prev ran but done, long overdue, past short deadline, F": {f, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0},
}
for name, tc := range testCases {
// Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines.
name := name
tc := tc
t.Run(name, func(t *testing.T) {
cj := cronJob()
cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
cj.Spec.Suspend = &tc.suspend
cj.Spec.Schedule = tc.schedule
if tc.deadline != noDead {
cj.Spec.StartingDeadlineSeconds = &tc.deadline
}
var (
job *batchv1.Job
err error
)
js := []batchv1.Job{}
if tc.ranPreviously {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
job, err = getJobFromTemplate(&cj, cj.Status.LastScheduleTime.Time)
if err != nil {
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
}
job.UID = "1234"
job.Namespace = ""
if tc.stillActive {
cj.Status.Active = []v1.ObjectReference{{UID: job.UID}}
js = append(js, *job)
}
} else {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
if tc.stillActive {
t.Errorf("%s: test setup error: this case makes no sense", name)
}
}
jc := &fakeJobControl{Job: job}
cjc := &fakeCJControl{}
recorder := record.NewFakeRecorder(10)
syncOne(&cj, js, tc.now, jc, cjc, recorder)
expectedCreates := 0
if tc.expectCreate {
expectedCreates = 1
}
if len(jc.Jobs) != expectedCreates {
t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs))
}
for i := range jc.Jobs {
job := &jc.Jobs[i]
controllerRef := metav1.GetControllerOf(job)
if controllerRef == nil {
t.Errorf("%s: expected job to have ControllerRef: %#v", name, job)
} else {
if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want)
}
if got, want := controllerRef.Kind, "CronJob"; got != want {
t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want)
}
if got, want := controllerRef.Name, cj.Name; got != want {
t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want)
}
if got, want := controllerRef.UID, cj.UID; got != want {
t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("%s: controllerRef.Controller is not set to true", name)
}
}
}
expectedDeletes := 0
if tc.expectDelete {
expectedDeletes = 1
}
if len(jc.DeleteJobName) != expectedDeletes {
t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName))
}
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1
expectedEvents := 0
if tc.expectCreate {
expectedEvents++
expectUpdates++
}
if tc.expectDelete {
expectedEvents++
}
expectedEvents += tc.expectedWarnings
if len(recorder.Events) != expectedEvents {
t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
}
numWarnings := 0
for i := 1; i <= len(recorder.Events); i++ {
e := <-recorder.Events
if strings.HasPrefix(e, v1.EventTypeWarning) {
numWarnings++
}
}
if numWarnings != tc.expectedWarnings {
t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings)
}
if tc.expectActive != len(cjc.Updates[expectUpdates-1].Status.Active) {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(cjc.Updates[expectUpdates-1].Status.Active))
}
})
}
}
type CleanupJobSpec struct {
StartTime string
IsFinished bool
IsSuccessful bool
ExpectDelete bool
IsStillInActiveList bool // only when IsFinished is set
}
func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
limitThree := int32(3)
limitTwo := int32(2)
limitOne := int32(1)
limitZero := int32(0)
// Starting times are assumed to be sorted by increasing start time
// in all the test cases
testCases := map[string]struct {
jobSpecs []CleanupJobSpec
now time.Time
successfulJobsHistoryLimit *int32
failedJobsHistoryLimit *int32
expectActive int
}{
"success. job limit reached": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, T, F},
{"2016-05-19T05:00:00Z", T, T, T, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitTwo, &limitOne, 1},
"success. jobs not processed by Sync yet": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, T, F},
{"2016-05-19T05:00:00Z", T, T, T, T},
{"2016-05-19T06:00:00Z", T, T, F, T},
{"2016-05-19T07:00:00Z", T, T, F, T},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, T},
}, justBeforeTheHour(), &limitTwo, &limitOne, 4},
"failed job limit reached": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, F, T, F},
{"2016-05-19T05:00:00Z", T, F, T, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitTwo, &limitTwo, 0},
"success. job limit set to zero": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, T, F},
{"2016-05-19T05:00:00Z", T, F, T, F},
{"2016-05-19T06:00:00Z", T, T, T, F},
{"2016-05-19T07:00:00Z", T, T, T, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitZero, &limitOne, 1},
"failed job limit set to zero": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, T, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, T, F},
}, justBeforeTheHour(), &limitThree, &limitZero, 1},
"no limits reached": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitThree, &limitThree, 0},
// This test case should trigger the short-circuit
"limits disabled": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), nil, nil, 0},
"success limit disabled": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), nil, &limitThree, 0},
"failure limit disabled": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitThree, nil, 0},
"no limits reached because still active": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", F, F, F, F},
{"2016-05-19T05:00:00Z", F, F, F, F},
{"2016-05-19T06:00:00Z", F, F, F, F},
{"2016-05-19T07:00:00Z", F, F, F, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", F, F, F, F},
}, justBeforeTheHour(), &limitZero, &limitZero, 6},
}
for name, tc := range testCases {
// Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines.
name := name
tc := tc
t.Run(name, func(t *testing.T) {
cj := cronJob()
suspend := false
cj.Spec.ConcurrencyPolicy = f
cj.Spec.Suspend = &suspend
cj.Spec.Schedule = onTheHour
cj.Spec.SuccessfulJobsHistoryLimit = tc.successfulJobsHistoryLimit
cj.Spec.FailedJobsHistoryLimit = tc.failedJobsHistoryLimit
var (
job *batchv1.Job
err error
)
// Set consistent timestamps for the CronJob
if len(tc.jobSpecs) != 0 {
firstTime := startTimeStringToTime(tc.jobSpecs[0].StartTime)
lastTime := startTimeStringToTime(tc.jobSpecs[len(tc.jobSpecs)-1].StartTime)
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: firstTime}
cj.Status.LastScheduleTime = &metav1.Time{Time: lastTime}
} else {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
}
// Create jobs
js := []batchv1.Job{}
jobsToDelete := sets.NewString()
cj.Status.Active = []v1.ObjectReference{}
for i, spec := range tc.jobSpecs {
job, err = getJobFromTemplate(&cj, startTimeStringToTime(spec.StartTime))
if err != nil {
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
}
job.UID = types.UID(strconv.Itoa(i))
job.Namespace = ""
if spec.IsFinished {
var conditionType batchv1.JobConditionType
if spec.IsSuccessful {
conditionType = batchv1.JobComplete
} else {
conditionType = batchv1.JobFailed
}
condition := batchv1.JobCondition{Type: conditionType, Status: v1.ConditionTrue}
job.Status.Conditions = append(job.Status.Conditions, condition)
if spec.IsStillInActiveList {
cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID})
}
} else {
if spec.IsSuccessful || spec.IsStillInActiveList {
t.Errorf("%s: test setup error: this case makes no sense", name)
}
cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID})
}
js = append(js, *job)
if spec.ExpectDelete {
jobsToDelete.Insert(job.Name)
}
}
jc := &fakeJobControl{Job: job}
cjc := &fakeCJControl{}
recorder := record.NewFakeRecorder(10)
cleanupFinishedJobs(&cj, js, jc, cjc, recorder)
// Check we have actually deleted the correct jobs
if len(jc.DeleteJobName) != len(jobsToDelete) {
t.Errorf("%s: expected %d job deleted, actually %d", name, len(jobsToDelete), len(jc.DeleteJobName))
} else {
jcDeleteJobName := sets.NewString(jc.DeleteJobName...)
if !jcDeleteJobName.Equal(jobsToDelete) {
t.Errorf("%s: expected jobs: %v deleted, actually: %v deleted", name, jobsToDelete, jcDeleteJobName)
}
}
// Check for events
expectedEvents := len(jobsToDelete)
if name == "failed list pod err" {
expectedEvents = len(tc.jobSpecs)
}
if len(recorder.Events) != expectedEvents {
t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
}
// Check for jobs still in active list
numActive := 0
if len(cjc.Updates) != 0 {
numActive = len(cjc.Updates[len(cjc.Updates)-1].Status.Active)
}
if tc.expectActive != numActive {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, numActive)
}
})
}
}
// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs,
// but over time, all jobs run as expected (assuming Allow and no deadline).
// TestSyncOne_Status tests cj.UpdateStatus in syncOne
func TestSyncOne_Status(t *testing.T) {
finishedJob := newJob("1")
finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batchv1.JobCondition{Type: batchv1.JobComplete, Status: v1.ConditionTrue})
unexpectedJob := newJob("2")
missingJob := newJob("3")
testCases := map[string]struct {
// cj spec
concurrencyPolicy batchv1.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
// cj status
ranPreviously bool
hasFinishedJob bool
// environment
now time.Time
hasUnexpectedJob bool
hasMissingJob bool
beingDeleted bool
// expectations
expectCreate bool
expectDelete bool
}{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F},
"never ran, is time, deleting": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, T, F, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, *justAfterTheHour(), F, F, F, F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F, F},
"prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F},
"prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F},
"prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F, F},
"prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F, F},
"prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F, F},
"prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F},
"prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, deleting": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, T, F, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), T, F, F, T, F},
}
for name, tc := range testCases {
// Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines.
name := name
tc := tc
t.Run(name, func(t *testing.T) {
// Setup the test
cj := cronJob()
cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
cj.Spec.Suspend = &tc.suspend
cj.Spec.Schedule = tc.schedule
if tc.deadline != noDead {
cj.Spec.StartingDeadlineSeconds = &tc.deadline
}
if tc.ranPreviously {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
} else {
if tc.hasFinishedJob || tc.hasUnexpectedJob || tc.hasMissingJob {
t.Errorf("%s: test setup error: this case makes no sense", name)
}
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
}
jobs := []batchv1.Job{}
if tc.hasFinishedJob {
ref, err := getRef(&finishedJob)
if err != nil {
t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
}
cj.Status.Active = []v1.ObjectReference{*ref}
jobs = append(jobs, finishedJob)
}
if tc.hasUnexpectedJob {
jobs = append(jobs, unexpectedJob)
}
if tc.hasMissingJob {
ref, err := getRef(&missingJob)
if err != nil {
t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
}
cj.Status.Active = append(cj.Status.Active, *ref)
}
if tc.beingDeleted {
timestamp := metav1.NewTime(tc.now)
cj.DeletionTimestamp = &timestamp
}
jc := &fakeJobControl{}
cjc := &fakeCJControl{}
recorder := record.NewFakeRecorder(10)
// Run the code
syncOne(&cj, jobs, tc.now, jc, cjc, recorder)
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1
// Events happens when there's unexpected / finished jobs, and upon job creation / deletion.
expectedEvents := 0
if tc.expectCreate {
expectUpdates++
expectedEvents++
}
if tc.expectDelete {
expectedEvents++
}
if tc.hasFinishedJob {
expectedEvents++
}
if tc.hasUnexpectedJob {
expectedEvents++
}
if tc.hasMissingJob {
expectedEvents++
}
if len(recorder.Events) != expectedEvents {
t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events)
}
if expectUpdates != len(cjc.Updates) {
t.Errorf("%s: expected %d status updates, actually %d", name, expectUpdates, len(cjc.Updates))
}
if tc.hasFinishedJob && inActiveList(cjc.Updates[0], finishedJob.UID) {
t.Errorf("%s: expected finished job removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
}
if tc.hasUnexpectedJob && inActiveList(cjc.Updates[0], unexpectedJob.UID) {
t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
}
if tc.hasMissingJob && inActiveList(cjc.Updates[0], missingJob.UID) {
t.Errorf("%s: expected missing job to be removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
}
if tc.expectCreate && !cjc.Updates[1].Status.LastScheduleTime.Time.Equal(*topOfTheHour()) {
t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), cjc.Updates[1].Status.LastScheduleTime)
}
})
}
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -39,6 +40,7 @@ import (
batchv1listers "k8s.io/client-go/listers/batch/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
@ -47,6 +49,9 @@ import (
)
var (
// controllerKind contains the schema.GroupVersionKind for this controller type.
controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
nextScheduleDelta = 100 * time.Millisecond
)
@ -694,3 +699,24 @@ func isJobInActiveList(job *batchv1.Job, activeJobs []corev1.ObjectReference) bo
}
return false
}
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
// delete the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return false
}
// ... and its reference from active list
deleteFromActiveList(cj, job.ObjectMeta.UID)
recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
return true
}
func getRef(object runtime.Object) (*corev1.ObjectReference, error) {
return ref.GetReference(scheme.Scheme, object)
}

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
@ -39,6 +40,66 @@ import (
"k8s.io/kubernetes/pkg/controller"
)
var (
shortDead int64 = 10
mediumDead int64 = 2 * 60 * 60
longDead int64 = 1000000
noDead int64 = -12345
errorSchedule = "obvious error schedule"
// schedule is hourly on the hour
onTheHour = "0 * * * ?"
A = batchv1.AllowConcurrent
f = batchv1.ForbidConcurrent
R = batchv1.ReplaceConcurrent
T = true
F = false
)
// returns a cronJob with some fields filled in.
func cronJob() batchv1.CronJob {
return batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
CreationTimestamp: metav1.Time{Time: justBeforeTheHour()},
},
Spec: batchv1.CronJobSpec{
Schedule: "* * * * ?",
ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: jobSpec(),
},
},
}
}
func jobSpec() batchv1.JobSpec {
one := int32(1)
return batchv1.JobSpec{
Parallelism: &one,
Completions: &one,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Image: "foo/bar"},
},
},
},
}
}
func justASecondBeforeTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:59Z")
if err != nil {
@ -47,6 +108,46 @@ func justASecondBeforeTheHour() time.Time {
return T1
}
func justAfterThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justBeforeThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justAfterTheHour() *time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z")
if err != nil {
panic("test setup error")
}
return &T1
}
func justBeforeTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func weekAfterTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-26T10:00:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func TestControllerV2SyncCronJob(t *testing.T) {
// Check expectations on deadline parameters
if shortDead/60/60 >= 1 {

View File

@ -22,10 +22,8 @@ import (
"sync"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
@ -104,22 +102,6 @@ type realJobControl struct {
var _ jobControlInterface = &realJobControl{}
func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
l := make(labels.Set)
for k, v := range template.Labels {
l[k] = v
}
return l
}
func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set {
a := make(labels.Set)
for k, v := range template.Annotations {
a[k] = v
}
return a
}
func (r realJobControl) GetJob(namespace, name string) (*batchv1.Job, error) {
return r.KubeClient.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
@ -215,59 +197,3 @@ func (f *fakeJobControl) Clear() {
f.Jobs = []batchv1.Job{}
f.Err = nil
}
// ------------------------------------------------------------------ //
// podControlInterface is an interface that knows how to list or delete pods
// created as an interface to allow testing.
type podControlInterface interface {
// ListPods list pods
ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error)
// DeleteJob deletes the pod identified by name.
// TODO: delete by UID?
DeletePod(namespace string, name string) error
}
// realPodControl is the default implementation of podControlInterface.
type realPodControl struct {
KubeClient clientset.Interface
Recorder record.EventRecorder
}
var _ podControlInterface = &realPodControl{}
func (r realPodControl) ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) {
return r.KubeClient.CoreV1().Pods(namespace).List(context.TODO(), opts)
}
func (r realPodControl) DeletePod(namespace string, name string) error {
return r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}
type fakePodControl struct {
sync.Mutex
Pods []v1.Pod
DeletePodName []string
Err error
}
var _ podControlInterface = &fakePodControl{}
func (f *fakePodControl) ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return nil, f.Err
}
return &v1.PodList{Items: f.Pods}, nil
}
func (f *fakePodControl) DeletePod(namespace string, name string) error {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return f.Err
}
f.DeletePodName = append(f.DeletePodName, name)
return nil
}

View File

@ -21,13 +21,14 @@ import (
"time"
"github.com/robfig/cron/v3"
"k8s.io/klog/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
)
// Utilities for dealing with Jobs and CronJobs and time.
@ -56,99 +57,6 @@ func deleteFromActiveList(cj *batchv1.CronJob, uid types.UID) {
cj.Status.Active = newActive
}
// getParentUIDFromJob extracts UID of job's parent and whether it was found
func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) {
controllerRef := metav1.GetControllerOf(&j)
if controllerRef == nil {
return types.UID(""), false
}
if controllerRef.Kind != "CronJob" {
klog.V(4).Infof("Job with non-CronJob parent, name %s namespace %s", j.Name, j.Namespace)
return types.UID(""), false
}
return controllerRef.UID, true
}
// groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. cronJob).
// It has no receiver, to facilitate testing.
func groupJobsByParent(js []batchv1.Job) map[types.UID][]batchv1.Job {
jobsByCj := make(map[types.UID][]batchv1.Job)
for _, job := range js {
parentUID, found := getParentUIDFromJob(job)
if !found {
klog.V(4).Infof("Unable to get parent uid from job %s in namespace %s", job.Name, job.Namespace)
continue
}
jobsByCj[parentUID] = append(jobsByCj[parentUID], job)
}
return jobsByCj
}
// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
//
// If there are too many (>100) unstarted times, just give up and return an empty slice.
// If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(cj batchv1.CronJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{}
sched, err := cron.ParseStandard(cj.Spec.Schedule)
if err != nil {
return starts, fmt.Errorf("unparseable schedule: %s : %s", cj.Spec.Schedule, err)
}
var earliestTime time.Time
if cj.Status.LastScheduleTime != nil {
earliestTime = cj.Status.LastScheduleTime.Time
} else {
// If none found, then this is either a recently created cronJob,
// or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the
// CronJob as last known start time.
earliestTime = cj.ObjectMeta.CreationTimestamp.Time
}
if cj.Spec.StartingDeadlineSeconds != nil {
// Controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return []time.Time{}, nil
}
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
starts = append(starts, t)
// An object might miss several starts. For example, if
// controller gets wedged on friday at 5:01pm when everyone has
// gone home, and someone comes in on tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly cronJob, should
// all start running with no further intervention (if the cronJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the missed start times.
//
// I've somewhat arbitrarily picked 100, as more than 80,
// but less than "lots".
if len(starts) > 100 {
// We can't get the most recent times so just return an empty slice
return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew")
}
}
return starts, nil
}
// getNextScheduleTime gets the time of next schedule after last scheduled and before now
// it returns nil if no unmet schedule times.
// If there are too many (>100) unstarted times, it will raise a warning and but still return
@ -232,28 +140,20 @@ func getMostRecentScheduleTime(earliestTime time.Time, now time.Time, schedule c
return &t, numberOfMissedSchedules, nil
}
// getJobFromTemplate makes a Job from a CronJob
func getJobFromTemplate(cj *batchv1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
labels := copyLabels(&cj.Spec.JobTemplate)
annotations := copyAnnotations(&cj.Spec.JobTemplate)
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := fmt.Sprintf("%s-%d", cj.Name, getTimeHash(scheduledTime))
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: annotations,
Name: name,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)},
},
func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
l := make(labels.Set)
for k, v := range template.Labels {
l[k] = v
}
cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec)
return job, nil
return l
}
// getTimeHash returns Unix Epoch Time
func getTimeHash(scheduledTime time.Time) int64 {
return scheduledTime.Unix()
func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set {
a := make(labels.Set)
for k, v := range template.Annotations {
a[k] = v
}
return a
}
// getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from

View File

@ -29,67 +29,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
utilpointer "k8s.io/utils/pointer"
)
func TestGetJobFromTemplate(t *testing.T) {
// getJobFromTemplate() needs to take the job template and copy the labels and annotations
// and other fields, and add a created-by reference.
var one int64 = 1
var no bool
cj := batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob",
},
Spec: batchv1.CronJobSpec{
Schedule: "* * * * ?",
ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &one,
ManualSelector: &no,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Image: "foo/bar"},
},
},
},
},
},
},
}
var job *batchv1.Job
job, err := getJobFromTemplate(&cj, time.Time{})
if err != nil {
t.Errorf("Did not expect error: %s", err)
}
if !strings.HasPrefix(job.ObjectMeta.Name, "mycronjob-") {
t.Errorf("Wrong Name")
}
if len(job.ObjectMeta.Labels) != 1 {
t.Errorf("Wrong number of labels")
}
if len(job.ObjectMeta.Annotations) != 1 {
t.Errorf("Wrong number of annotations")
}
}
func TestGetJobFromTemplate2(t *testing.T) {
// getJobFromTemplate2() needs to take the job template and copy the labels and annotations
// and other fields, and add a created-by reference.
@ -148,159 +89,6 @@ func TestGetJobFromTemplate2(t *testing.T) {
}
}
func TestGetParentUIDFromJob(t *testing.T) {
j := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: metav1.NamespaceDefault,
},
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Image: "foo/bar"},
},
},
},
},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
}},
},
}
{
// Case 1: No ControllerRef
_, found := getParentUIDFromJob(*j)
if found {
t.Errorf("Unexpectedly found uid")
}
}
{
// Case 2: Has ControllerRef
j.ObjectMeta.SetOwnerReferences([]metav1.OwnerReference{
{
Kind: "CronJob",
UID: types.UID("5ef034e0-1890-11e6-8935-42010af0003e"),
Controller: utilpointer.BoolPtr(true),
},
})
expectedUID := types.UID("5ef034e0-1890-11e6-8935-42010af0003e")
uid, found := getParentUIDFromJob(*j)
if !found {
t.Errorf("Unexpectedly did not find uid")
} else if uid != expectedUID {
t.Errorf("Wrong UID: %v", uid)
}
}
}
func TestGroupJobsByParent(t *testing.T) {
uid1 := types.UID("11111111-1111-1111-1111-111111111111")
uid2 := types.UID("22222222-2222-2222-2222-222222222222")
uid3 := types.UID("33333333-3333-3333-3333-333333333333")
ownerReference1 := metav1.OwnerReference{
Kind: "CronJob",
UID: uid1,
Controller: utilpointer.BoolPtr(true),
}
ownerReference2 := metav1.OwnerReference{
Kind: "CronJob",
UID: uid2,
Controller: utilpointer.BoolPtr(true),
}
ownerReference3 := metav1.OwnerReference{
Kind: "CronJob",
UID: uid3,
Controller: utilpointer.BoolPtr(true),
}
{
// Case 1: There are no jobs and cronJobs
js := []batchv1.Job{}
jobsByCj := groupJobsByParent(js)
if len(jobsByCj) != 0 {
t.Errorf("Wrong number of items in map")
}
}
{
// Case 2: there is one controller with one job it created.
js := []batchv1.Job{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}},
}
jobsBySj := groupJobsByParent(js)
if len(jobsBySj) != 1 {
t.Errorf("Wrong number of items in map")
}
jobList1, found := jobsBySj[uid1]
if !found {
t.Errorf("Key not found")
}
if len(jobList1) != 1 {
t.Errorf("Wrong number of items in map")
}
}
{
// Case 3: Two namespaces, one has two jobs from one controller, other has 3 jobs from two controllers.
// There are also two jobs with no created-by annotation.
js := []batchv1.Job{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}},
{ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference2}}},
{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}},
{ObjectMeta: metav1.ObjectMeta{Name: "d", Namespace: "x", OwnerReferences: []metav1.OwnerReference{}}},
{ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "y", OwnerReferences: []metav1.OwnerReference{ownerReference3}}},
{ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "y", OwnerReferences: []metav1.OwnerReference{ownerReference3}}},
{ObjectMeta: metav1.ObjectMeta{Name: "d", Namespace: "y", OwnerReferences: []metav1.OwnerReference{}}},
}
jobsBySj := groupJobsByParent(js)
if len(jobsBySj) != 3 {
t.Errorf("Wrong number of items in map")
}
jobList1, found := jobsBySj[uid1]
if !found {
t.Errorf("Key not found")
}
if len(jobList1) != 2 {
t.Errorf("Wrong number of items in map")
}
jobList2, found := jobsBySj[uid2]
if !found {
t.Errorf("Key not found")
}
if len(jobList2) != 1 {
t.Errorf("Wrong number of items in map")
}
jobList3, found := jobsBySj[uid3]
if !found {
t.Errorf("Key not found")
}
if len(jobList3) != 2 {
t.Errorf("Wrong number of items in map")
}
}
}
func TestGetNextScheduleTime(t *testing.T) {
// schedule is hourly on the hour
schedule := "0 * * * ?"
@ -427,142 +215,6 @@ func TestGetNextScheduleTime(t *testing.T) {
}
}
func TestGetRecentUnmetScheduleTimes(t *testing.T) {
// schedule is hourly on the hour
schedule := "0 * * * ?"
// T1 is a scheduled start time of that schedule
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
t.Errorf("test setup error: %v", err)
}
// T2 is a scheduled start time of that schedule after T1
T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z")
if err != nil {
t.Errorf("test setup error: %v", err)
}
cj := batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batchv1.CronJobSpec{
Schedule: schedule,
ConcurrencyPolicy: batchv1.AllowConcurrent,
JobTemplate: batchv1.JobTemplateSpec{},
},
}
{
// Case 1: no known start times, and none needed yet.
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is more than creation time, but less than T1.
now := T1.Add(-7 * time.Minute)
times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 0 {
t.Errorf("expected no start times, got: %v", times)
}
}
{
// Case 2: no known start times, and one needed.
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is after T1
now := T1.Add(2 * time.Second)
times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 1 {
t.Errorf("expected 1 start time, got: %v", times)
} else if !times[0].Equal(T1) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
}
{
// Case 3: known LastScheduleTime, no start needed.
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time.
cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1
now := T1.Add(2 * time.Minute)
times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 0 {
t.Errorf("expected 0 start times, got: %v", times)
}
}
{
// Case 4: known LastScheduleTime, a start needed
// Creation time is before T1.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time.
cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1 and after T2
now := T2.Add(5 * time.Minute)
times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 1 {
t.Errorf("expected 1 start times, got: %v", times)
} else if !times[0].Equal(T2) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
}
{
// Case 5: known LastScheduleTime, two starts needed
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
// Current time is after T1 and after T2
now := T2.Add(5 * time.Minute)
times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 2 {
t.Errorf("expected 2 start times, got: %v", times)
} else {
if !times[0].Equal(T1) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
if !times[1].Equal(T2) {
t.Errorf("expected: %v, got: %v", T2, times[1])
}
}
}
{
// Case 6: now is way way ahead of last start time, and there is no deadline.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour)
_, err := getRecentUnmetScheduleTimes(cj, now)
if err == nil {
t.Errorf("expected an error")
}
}
{
// Case 7: now is way way ahead of last start time, but there is a short deadline.
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour)
// Deadline is short
deadline := int64(2 * 60 * 60)
cj.Spec.StartingDeadlineSeconds = &deadline
_, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil {
t.Errorf("unexpected error")
}
}
}
func TestByJobStartTime(t *testing.T) {
now := metav1.NewTime(time.Date(2018, time.January, 1, 2, 3, 4, 5, time.UTC))
later := metav1.NewTime(time.Date(2019, time.January, 1, 2, 3, 4, 5, time.UTC))
@ -708,3 +360,20 @@ func TestGetMostRecentScheduleTime(t *testing.T) {
})
}
}
func topOfTheHour() *time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
panic("test setup error")
}
return &T1
}
func deltaTimeAfterTopOfTheHour(duration time.Duration) *time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
panic("test setup error")
}
t := T1.Add(duration)
return &t
}