mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Remove old cronjob controller
This commit is contained in:
parent
5446b89857
commit
5254493044
@ -1,387 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package cronjob
|
|
||||||
|
|
||||||
/*
|
|
||||||
I did not use watch or expectations. Those add a lot of corner cases, and we aren't
|
|
||||||
expecting a large volume of jobs or cronJobs. (We are favoring correctness
|
|
||||||
over scalability. If we find a single controller thread is too slow because
|
|
||||||
there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
|
|
||||||
If we find the load on the API server is too high, we can use a watch and
|
|
||||||
UndeltaStore.)
|
|
||||||
|
|
||||||
Just periodically list jobs and cronJobs, and then reconcile them.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sort"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
|
||||||
v1 "k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
"k8s.io/client-go/tools/pager"
|
|
||||||
"k8s.io/client-go/tools/record"
|
|
||||||
ref "k8s.io/client-go/tools/reference"
|
|
||||||
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Utilities for dealing with Jobs and CronJobs and time.
|
|
||||||
|
|
||||||
// controllerKind contains the schema.GroupVersionKind for this controller type.
|
|
||||||
var controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
|
|
||||||
|
|
||||||
// Controller is a controller for CronJobs.
|
|
||||||
type Controller struct {
|
|
||||||
kubeClient clientset.Interface
|
|
||||||
jobControl jobControlInterface
|
|
||||||
cjControl cjControlInterface
|
|
||||||
podControl podControlInterface
|
|
||||||
recorder record.EventRecorder
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewController creates and initializes a new Controller.
|
|
||||||
func NewController(kubeClient clientset.Interface) (*Controller, error) {
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
eventBroadcaster.StartStructuredLogging(0)
|
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
|
||||||
|
|
||||||
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
|
||||||
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jm := &Controller{
|
|
||||||
kubeClient: kubeClient,
|
|
||||||
jobControl: realJobControl{KubeClient: kubeClient},
|
|
||||||
cjControl: &realCJControl{KubeClient: kubeClient},
|
|
||||||
podControl: &realPodControl{KubeClient: kubeClient},
|
|
||||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
|
|
||||||
}
|
|
||||||
|
|
||||||
return jm, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts the main goroutine responsible for watching and syncing jobs.
|
|
||||||
func (jm *Controller) Run(stopCh <-chan struct{}) {
|
|
||||||
defer utilruntime.HandleCrash()
|
|
||||||
klog.Infof("Starting CronJob Manager")
|
|
||||||
// Check things every 10 second.
|
|
||||||
go wait.Until(jm.syncAll, 10*time.Second, stopCh)
|
|
||||||
<-stopCh
|
|
||||||
klog.Infof("Shutting down CronJob Manager")
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncAll lists all the CronJobs and Jobs and reconciles them.
|
|
||||||
func (jm *Controller) syncAll() {
|
|
||||||
// List children (Jobs) before parents (CronJob).
|
|
||||||
// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
|
|
||||||
// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
|
|
||||||
// Note that this only works because we are NOT using any caches here.
|
|
||||||
jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
js := make([]batchv1.Job, 0)
|
|
||||||
err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
|
|
||||||
jobTmp, ok := object.(*batchv1.Job)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
|
|
||||||
}
|
|
||||||
js = append(js, *jobTmp)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
klog.V(4).Infof("Found %d jobs", len(js))
|
|
||||||
|
|
||||||
jobsByCj := groupJobsByParent(js)
|
|
||||||
klog.V(4).Infof("Found %d groups", len(jobsByCj))
|
|
||||||
|
|
||||||
err = pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
return jm.kubeClient.BatchV1().CronJobs(metav1.NamespaceAll).List(ctx, opts)
|
|
||||||
}).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
|
|
||||||
cj, ok := object.(*batchv1.CronJob)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("expected type *batchv1.CronJob, got type %T", cj)
|
|
||||||
}
|
|
||||||
syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
|
|
||||||
cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanupFinishedJobs cleanups finished jobs created by a CronJob
|
|
||||||
func cleanupFinishedJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface,
|
|
||||||
cjc cjControlInterface, recorder record.EventRecorder) {
|
|
||||||
// If neither limits are active, there is no need to do anything.
|
|
||||||
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
failedJobs := []batchv1.Job{}
|
|
||||||
successfulJobs := []batchv1.Job{}
|
|
||||||
|
|
||||||
for _, job := range js {
|
|
||||||
isFinished, finishedStatus := getFinishedStatus(&job)
|
|
||||||
if isFinished && finishedStatus == batchv1.JobComplete {
|
|
||||||
successfulJobs = append(successfulJobs, job)
|
|
||||||
} else if isFinished && finishedStatus == batchv1.JobFailed {
|
|
||||||
failedJobs = append(failedJobs, job)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if cj.Spec.SuccessfulJobsHistoryLimit != nil {
|
|
||||||
removeOldestJobs(cj,
|
|
||||||
successfulJobs,
|
|
||||||
jc,
|
|
||||||
*cj.Spec.SuccessfulJobsHistoryLimit,
|
|
||||||
recorder)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cj.Spec.FailedJobsHistoryLimit != nil {
|
|
||||||
removeOldestJobs(cj,
|
|
||||||
failedJobs,
|
|
||||||
jc,
|
|
||||||
*cj.Spec.FailedJobsHistoryLimit,
|
|
||||||
recorder)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the CronJob, in case jobs were removed from the list.
|
|
||||||
if _, err := cjc.UpdateStatus(cj); err != nil {
|
|
||||||
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
|
|
||||||
klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// removeOldestJobs removes the oldest jobs from a list of jobs
|
|
||||||
func removeOldestJobs(cj *batchv1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
|
|
||||||
numToDelete := len(js) - int(maxJobs)
|
|
||||||
if numToDelete <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
|
|
||||||
klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
|
|
||||||
|
|
||||||
sort.Sort(byJobStartTime(js))
|
|
||||||
for i := 0; i < numToDelete; i++ {
|
|
||||||
klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
|
|
||||||
deleteJob(cj, &js[i], jc, recorder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncOne reconciles a CronJob with a list of any Jobs that it created.
|
|
||||||
// All known jobs created by "cj" should be included in "js".
|
|
||||||
// The current time is passed in to facilitate testing.
|
|
||||||
// It has no receiver, to facilitate testing.
|
|
||||||
func syncOne(cj *batchv1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
|
|
||||||
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
|
|
||||||
|
|
||||||
childrenJobs := make(map[types.UID]bool)
|
|
||||||
for _, j := range js {
|
|
||||||
childrenJobs[j.ObjectMeta.UID] = true
|
|
||||||
found := inActiveList(*cj, j.ObjectMeta.UID)
|
|
||||||
if !found && !IsJobFinished(&j) {
|
|
||||||
recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
|
|
||||||
// We found an unfinished job that has us as the parent, but it is not in our Active list.
|
|
||||||
// This could happen if we crashed right after creating the Job and before updating the status,
|
|
||||||
// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
|
|
||||||
// a job that they wanted us to adopt.
|
|
||||||
|
|
||||||
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
|
|
||||||
// stop users from creating jobs if they have permission. It is assumed that if a
|
|
||||||
// user has permission to create a job within a namespace, then they have permission to make any cronJob
|
|
||||||
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
|
|
||||||
// TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
|
|
||||||
} else if found && IsJobFinished(&j) {
|
|
||||||
_, status := getFinishedStatus(&j)
|
|
||||||
deleteFromActiveList(cj, j.ObjectMeta.UID)
|
|
||||||
recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove any job reference from the active list if the corresponding job does not exist any more.
|
|
||||||
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
|
|
||||||
// job running.
|
|
||||||
for _, j := range cj.Status.Active {
|
|
||||||
if found := childrenJobs[j.UID]; !found {
|
|
||||||
recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
|
|
||||||
deleteFromActiveList(cj, j.UID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
updatedCJ, err := cjc.UpdateStatus(cj)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
*cj = *updatedCJ
|
|
||||||
|
|
||||||
if cj.DeletionTimestamp != nil {
|
|
||||||
// The CronJob is being deleted.
|
|
||||||
// Don't do anything other than updating status.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
|
|
||||||
klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
times, err := getRecentUnmetScheduleTimes(*cj, now)
|
|
||||||
if err != nil {
|
|
||||||
recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
|
|
||||||
klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
|
|
||||||
if len(times) == 0 {
|
|
||||||
klog.V(4).Infof("No unmet start times for %s", nameForLog)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(times) > 1 {
|
|
||||||
klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
|
|
||||||
}
|
|
||||||
|
|
||||||
scheduledTime := times[len(times)-1]
|
|
||||||
tooLate := false
|
|
||||||
if cj.Spec.StartingDeadlineSeconds != nil {
|
|
||||||
tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
|
|
||||||
}
|
|
||||||
if tooLate {
|
|
||||||
klog.V(4).Infof("Missed starting window for %s", nameForLog)
|
|
||||||
recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
|
|
||||||
// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
|
|
||||||
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
|
|
||||||
// the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
|
|
||||||
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
|
|
||||||
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
|
|
||||||
// and event the next time we process it, and also so the user looking at the status
|
|
||||||
// can see easily that there was a missed execution.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
|
|
||||||
// Regardless which source of information we use for the set of active jobs,
|
|
||||||
// there is some risk that we won't see an active job when there is one.
|
|
||||||
// (because we haven't seen the status update to the SJ or the created pod).
|
|
||||||
// So it is theoretically possible to have concurrency with Forbid.
|
|
||||||
// As long the as the invocations are "far enough apart in time", this usually won't happen.
|
|
||||||
//
|
|
||||||
// TODO: for Forbid, we could use the same name for every execution, as a lock.
|
|
||||||
// With replace, we could use a name that is deterministic per execution time.
|
|
||||||
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
|
|
||||||
klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
|
|
||||||
for _, j := range cj.Status.Active {
|
|
||||||
klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
|
|
||||||
|
|
||||||
job, err := jc.GetJob(j.Namespace, j.Name)
|
|
||||||
if err != nil {
|
|
||||||
recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !deleteJob(cj, job, jc, recorder) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jobReq, err := getJobFromTemplate(cj, scheduledTime)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
jobResp, err := jc.CreateJob(cj.Namespace, jobReq)
|
|
||||||
if err != nil {
|
|
||||||
// If the namespace is being torn down, we can safely ignore
|
|
||||||
// this error since all subsequent creations will fail.
|
|
||||||
if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
|
|
||||||
recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
|
|
||||||
recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
|
|
||||||
|
|
||||||
// ------------------------------------------------------------------ //
|
|
||||||
|
|
||||||
// If this process restarts at this point (after posting a job, but
|
|
||||||
// before updating the status), then we might try to start the job on
|
|
||||||
// the next time. Actually, if we re-list the SJs and Jobs on the next
|
|
||||||
// iteration of syncAll, we might not see our own status update, and
|
|
||||||
// then post one again. So, we need to use the job name as a lock to
|
|
||||||
// prevent us from making the job twice (name the job with hash of its
|
|
||||||
// scheduled time).
|
|
||||||
|
|
||||||
// Add the just-started job to the status list.
|
|
||||||
ref, err := getRef(jobResp)
|
|
||||||
if err != nil {
|
|
||||||
klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
|
|
||||||
} else {
|
|
||||||
cj.Status.Active = append(cj.Status.Active, *ref)
|
|
||||||
}
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
|
|
||||||
if _, err := cjc.UpdateStatus(cj); err != nil {
|
|
||||||
klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
|
|
||||||
func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
|
|
||||||
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
|
|
||||||
|
|
||||||
// delete the job itself...
|
|
||||||
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
|
|
||||||
recorder.Eventf(cj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
|
|
||||||
klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
// ... and its reference from active list
|
|
||||||
deleteFromActiveList(cj, job.ObjectMeta.UID)
|
|
||||||
recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func getRef(object runtime.Object) (*v1.ObjectReference, error) {
|
|
||||||
return ref.GetReference(scheme.Scheme, object)
|
|
||||||
}
|
|
@ -1,786 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package cronjob
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/client-go/tools/record"
|
|
||||||
// For the cronjob controller to do conversions.
|
|
||||||
_ "k8s.io/kubernetes/pkg/apis/batch/install"
|
|
||||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// schedule is hourly on the hour
|
|
||||||
onTheHour = "0 * * * ?"
|
|
||||||
errorSchedule = "obvious error schedule"
|
|
||||||
)
|
|
||||||
|
|
||||||
func justBeforeTheHour() time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z")
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
return T1
|
|
||||||
}
|
|
||||||
|
|
||||||
func topOfTheHour() *time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
return &T1
|
|
||||||
}
|
|
||||||
|
|
||||||
func deltaTimeAfterTopOfTheHour(duration time.Duration) *time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
t := T1.Add(duration)
|
|
||||||
return &t
|
|
||||||
}
|
|
||||||
|
|
||||||
func justAfterTheHour() *time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z")
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
return &T1
|
|
||||||
}
|
|
||||||
|
|
||||||
func weekAfterTheHour() time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-26T10:00:00Z")
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
return T1
|
|
||||||
}
|
|
||||||
|
|
||||||
func justBeforeThePriorHour() time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z")
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
return T1
|
|
||||||
}
|
|
||||||
|
|
||||||
func justAfterThePriorHour() time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z")
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
return T1
|
|
||||||
}
|
|
||||||
|
|
||||||
func startTimeStringToTime(startTime string) time.Time {
|
|
||||||
T1, err := time.Parse(time.RFC3339, startTime)
|
|
||||||
if err != nil {
|
|
||||||
panic("test setup error")
|
|
||||||
}
|
|
||||||
return T1
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns a cronJob with some fields filled in.
|
|
||||||
func cronJob() batchv1.CronJob {
|
|
||||||
return batchv1.CronJob{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "mycronjob",
|
|
||||||
Namespace: "snazzycats",
|
|
||||||
UID: types.UID("1a2b3c"),
|
|
||||||
CreationTimestamp: metav1.Time{Time: justBeforeTheHour()},
|
|
||||||
},
|
|
||||||
Spec: batchv1.CronJobSpec{
|
|
||||||
Schedule: "* * * * ?",
|
|
||||||
ConcurrencyPolicy: batchv1.AllowConcurrent,
|
|
||||||
JobTemplate: batchv1.JobTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: map[string]string{"a": "b"},
|
|
||||||
Annotations: map[string]string{"x": "y"},
|
|
||||||
},
|
|
||||||
Spec: jobSpec(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func jobSpec() batchv1.JobSpec {
|
|
||||||
one := int32(1)
|
|
||||||
return batchv1.JobSpec{
|
|
||||||
Parallelism: &one,
|
|
||||||
Completions: &one,
|
|
||||||
Template: v1.PodTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: map[string]string{
|
|
||||||
"foo": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{Image: "foo/bar"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newJob(UID string) batchv1.Job {
|
|
||||||
return batchv1.Job{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
UID: types.UID(UID),
|
|
||||||
Name: "foobar",
|
|
||||||
Namespace: metav1.NamespaceDefault,
|
|
||||||
},
|
|
||||||
Spec: jobSpec(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
shortDead int64 = 10
|
|
||||||
mediumDead int64 = 2 * 60 * 60
|
|
||||||
longDead int64 = 1000000
|
|
||||||
noDead int64 = -12345
|
|
||||||
A = batchv1.AllowConcurrent
|
|
||||||
f = batchv1.ForbidConcurrent
|
|
||||||
R = batchv1.ReplaceConcurrent
|
|
||||||
T = true
|
|
||||||
F = false
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestSyncOne_RunOrNot(t *testing.T) {
|
|
||||||
// Check expectations on deadline parameters
|
|
||||||
if shortDead/60/60 >= 1 {
|
|
||||||
t.Errorf("shortDead should be less than one hour")
|
|
||||||
}
|
|
||||||
|
|
||||||
if mediumDead/60/60 < 1 || mediumDead/60/60 >= 24 {
|
|
||||||
t.Errorf("mediumDead should be between one hour and one day")
|
|
||||||
}
|
|
||||||
|
|
||||||
if longDead/60/60/24 < 10 {
|
|
||||||
t.Errorf("longDead should be at least ten days")
|
|
||||||
}
|
|
||||||
|
|
||||||
testCases := map[string]struct {
|
|
||||||
// cj spec
|
|
||||||
concurrencyPolicy batchv1.ConcurrencyPolicy
|
|
||||||
suspend bool
|
|
||||||
schedule string
|
|
||||||
deadline int64
|
|
||||||
|
|
||||||
// cj status
|
|
||||||
ranPreviously bool
|
|
||||||
stillActive bool
|
|
||||||
|
|
||||||
// environment
|
|
||||||
now time.Time
|
|
||||||
|
|
||||||
// expectations
|
|
||||||
expectCreate bool
|
|
||||||
expectDelete bool
|
|
||||||
expectActive int
|
|
||||||
expectedWarnings int
|
|
||||||
}{
|
|
||||||
"never ran, not valid schedule, A": {A, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1},
|
|
||||||
"never ran, not valid schedule, F": {f, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1},
|
|
||||||
"never ran, not valid schedule, R": {f, F, errorSchedule, noDead, F, F, justBeforeTheHour(), F, F, 0, 1},
|
|
||||||
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0},
|
|
||||||
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0},
|
|
||||||
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0, 0},
|
|
||||||
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, 0, 0},
|
|
||||||
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, *justAfterTheHour(), F, F, 0, 0},
|
|
||||||
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
|
|
||||||
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0},
|
|
||||||
"prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0},
|
|
||||||
"prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0, 0},
|
|
||||||
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, 0, 0},
|
|
||||||
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), F, F, 0, 0},
|
|
||||||
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), T, F, 1, 0},
|
|
||||||
|
|
||||||
"still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0},
|
|
||||||
"still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0},
|
|
||||||
"still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1, 0},
|
|
||||||
"still active, is time, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, 2, 0},
|
|
||||||
"still active, is time, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, 1, 0},
|
|
||||||
"still active, is time, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, T, 1, 0},
|
|
||||||
"still active, is time, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, 1, 0},
|
|
||||||
"still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), F, F, 1, 0},
|
|
||||||
"still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), T, F, 2, 0},
|
|
||||||
|
|
||||||
// Controller should fail to schedule these, as there are too many missed starting times
|
|
||||||
// and either no deadline or a too long deadline.
|
|
||||||
"prev ran but done, long overdue, not past deadline, A": {A, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1},
|
|
||||||
"prev ran but done, long overdue, not past deadline, R": {R, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1},
|
|
||||||
"prev ran but done, long overdue, not past deadline, F": {f, F, onTheHour, longDead, T, F, weekAfterTheHour(), F, F, 0, 1},
|
|
||||||
"prev ran but done, long overdue, no deadline, A": {A, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1},
|
|
||||||
"prev ran but done, long overdue, no deadline, R": {R, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1},
|
|
||||||
"prev ran but done, long overdue, no deadline, F": {f, F, onTheHour, noDead, T, F, weekAfterTheHour(), F, F, 0, 1},
|
|
||||||
|
|
||||||
"prev ran but done, long overdue, past medium deadline, A": {A, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0},
|
|
||||||
"prev ran but done, long overdue, past short deadline, A": {A, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0},
|
|
||||||
|
|
||||||
"prev ran but done, long overdue, past medium deadline, R": {R, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0},
|
|
||||||
"prev ran but done, long overdue, past short deadline, R": {R, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0},
|
|
||||||
|
|
||||||
"prev ran but done, long overdue, past medium deadline, F": {f, F, onTheHour, mediumDead, T, F, weekAfterTheHour(), T, F, 1, 0},
|
|
||||||
"prev ran but done, long overdue, past short deadline, F": {f, F, onTheHour, shortDead, T, F, weekAfterTheHour(), T, F, 1, 0},
|
|
||||||
}
|
|
||||||
for name, tc := range testCases {
|
|
||||||
// Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines.
|
|
||||||
name := name
|
|
||||||
tc := tc
|
|
||||||
t.Run(name, func(t *testing.T) {
|
|
||||||
cj := cronJob()
|
|
||||||
cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
|
|
||||||
cj.Spec.Suspend = &tc.suspend
|
|
||||||
cj.Spec.Schedule = tc.schedule
|
|
||||||
if tc.deadline != noDead {
|
|
||||||
cj.Spec.StartingDeadlineSeconds = &tc.deadline
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
job *batchv1.Job
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
js := []batchv1.Job{}
|
|
||||||
if tc.ranPreviously {
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
|
|
||||||
job, err = getJobFromTemplate(&cj, cj.Status.LastScheduleTime.Time)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
|
|
||||||
}
|
|
||||||
job.UID = "1234"
|
|
||||||
job.Namespace = ""
|
|
||||||
if tc.stillActive {
|
|
||||||
cj.Status.Active = []v1.ObjectReference{{UID: job.UID}}
|
|
||||||
js = append(js, *job)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
|
|
||||||
if tc.stillActive {
|
|
||||||
t.Errorf("%s: test setup error: this case makes no sense", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jc := &fakeJobControl{Job: job}
|
|
||||||
cjc := &fakeCJControl{}
|
|
||||||
recorder := record.NewFakeRecorder(10)
|
|
||||||
|
|
||||||
syncOne(&cj, js, tc.now, jc, cjc, recorder)
|
|
||||||
expectedCreates := 0
|
|
||||||
if tc.expectCreate {
|
|
||||||
expectedCreates = 1
|
|
||||||
}
|
|
||||||
if len(jc.Jobs) != expectedCreates {
|
|
||||||
t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs))
|
|
||||||
}
|
|
||||||
for i := range jc.Jobs {
|
|
||||||
job := &jc.Jobs[i]
|
|
||||||
controllerRef := metav1.GetControllerOf(job)
|
|
||||||
if controllerRef == nil {
|
|
||||||
t.Errorf("%s: expected job to have ControllerRef: %#v", name, job)
|
|
||||||
} else {
|
|
||||||
if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
|
|
||||||
t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want)
|
|
||||||
}
|
|
||||||
if got, want := controllerRef.Kind, "CronJob"; got != want {
|
|
||||||
t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want)
|
|
||||||
}
|
|
||||||
if got, want := controllerRef.Name, cj.Name; got != want {
|
|
||||||
t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want)
|
|
||||||
}
|
|
||||||
if got, want := controllerRef.UID, cj.UID; got != want {
|
|
||||||
t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want)
|
|
||||||
}
|
|
||||||
if controllerRef.Controller == nil || *controllerRef.Controller != true {
|
|
||||||
t.Errorf("%s: controllerRef.Controller is not set to true", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedDeletes := 0
|
|
||||||
if tc.expectDelete {
|
|
||||||
expectedDeletes = 1
|
|
||||||
}
|
|
||||||
if len(jc.DeleteJobName) != expectedDeletes {
|
|
||||||
t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Status update happens once when ranging through job list, and another one if create jobs.
|
|
||||||
expectUpdates := 1
|
|
||||||
expectedEvents := 0
|
|
||||||
if tc.expectCreate {
|
|
||||||
expectedEvents++
|
|
||||||
expectUpdates++
|
|
||||||
}
|
|
||||||
if tc.expectDelete {
|
|
||||||
expectedEvents++
|
|
||||||
}
|
|
||||||
expectedEvents += tc.expectedWarnings
|
|
||||||
|
|
||||||
if len(recorder.Events) != expectedEvents {
|
|
||||||
t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
|
|
||||||
}
|
|
||||||
|
|
||||||
numWarnings := 0
|
|
||||||
for i := 1; i <= len(recorder.Events); i++ {
|
|
||||||
e := <-recorder.Events
|
|
||||||
if strings.HasPrefix(e, v1.EventTypeWarning) {
|
|
||||||
numWarnings++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if numWarnings != tc.expectedWarnings {
|
|
||||||
t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings)
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.expectActive != len(cjc.Updates[expectUpdates-1].Status.Active) {
|
|
||||||
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(cjc.Updates[expectUpdates-1].Status.Active))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type CleanupJobSpec struct {
|
|
||||||
StartTime string
|
|
||||||
IsFinished bool
|
|
||||||
IsSuccessful bool
|
|
||||||
ExpectDelete bool
|
|
||||||
IsStillInActiveList bool // only when IsFinished is set
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
|
|
||||||
limitThree := int32(3)
|
|
||||||
limitTwo := int32(2)
|
|
||||||
limitOne := int32(1)
|
|
||||||
limitZero := int32(0)
|
|
||||||
|
|
||||||
// Starting times are assumed to be sorted by increasing start time
|
|
||||||
// in all the test cases
|
|
||||||
testCases := map[string]struct {
|
|
||||||
jobSpecs []CleanupJobSpec
|
|
||||||
now time.Time
|
|
||||||
successfulJobsHistoryLimit *int32
|
|
||||||
failedJobsHistoryLimit *int32
|
|
||||||
expectActive int
|
|
||||||
}{
|
|
||||||
"success. job limit reached": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, T, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, T, T, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, F},
|
|
||||||
}, justBeforeTheHour(), &limitTwo, &limitOne, 1},
|
|
||||||
|
|
||||||
"success. jobs not processed by Sync yet": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, T, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, T, T, T},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, T},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, T},
|
|
||||||
{"2016-05-19T08:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, T},
|
|
||||||
}, justBeforeTheHour(), &limitTwo, &limitOne, 4},
|
|
||||||
|
|
||||||
"failed job limit reached": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, F, T, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, F, T, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, F},
|
|
||||||
}, justBeforeTheHour(), &limitTwo, &limitTwo, 0},
|
|
||||||
|
|
||||||
"success. job limit set to zero": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, T, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, F, T, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, T, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, T, F},
|
|
||||||
{"2016-05-19T08:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, F},
|
|
||||||
}, justBeforeTheHour(), &limitZero, &limitOne, 1},
|
|
||||||
|
|
||||||
"failed job limit set to zero": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, F, T, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, T, F},
|
|
||||||
}, justBeforeTheHour(), &limitThree, &limitZero, 1},
|
|
||||||
|
|
||||||
"no limits reached": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, F},
|
|
||||||
}, justBeforeTheHour(), &limitThree, &limitThree, 0},
|
|
||||||
|
|
||||||
// This test case should trigger the short-circuit
|
|
||||||
"limits disabled": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, F},
|
|
||||||
}, justBeforeTheHour(), nil, nil, 0},
|
|
||||||
|
|
||||||
"success limit disabled": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, F},
|
|
||||||
}, justBeforeTheHour(), nil, &limitThree, 0},
|
|
||||||
|
|
||||||
"failure limit disabled": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T05:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T06:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", T, T, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", T, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", T, F, F, F},
|
|
||||||
}, justBeforeTheHour(), &limitThree, nil, 0},
|
|
||||||
|
|
||||||
"no limits reached because still active": {
|
|
||||||
[]CleanupJobSpec{
|
|
||||||
{"2016-05-19T04:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T05:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T06:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T07:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T08:00:00Z", F, F, F, F},
|
|
||||||
{"2016-05-19T09:00:00Z", F, F, F, F},
|
|
||||||
}, justBeforeTheHour(), &limitZero, &limitZero, 6},
|
|
||||||
}
|
|
||||||
|
|
||||||
for name, tc := range testCases {
|
|
||||||
// Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines.
|
|
||||||
name := name
|
|
||||||
tc := tc
|
|
||||||
t.Run(name, func(t *testing.T) {
|
|
||||||
cj := cronJob()
|
|
||||||
suspend := false
|
|
||||||
cj.Spec.ConcurrencyPolicy = f
|
|
||||||
cj.Spec.Suspend = &suspend
|
|
||||||
cj.Spec.Schedule = onTheHour
|
|
||||||
|
|
||||||
cj.Spec.SuccessfulJobsHistoryLimit = tc.successfulJobsHistoryLimit
|
|
||||||
cj.Spec.FailedJobsHistoryLimit = tc.failedJobsHistoryLimit
|
|
||||||
|
|
||||||
var (
|
|
||||||
job *batchv1.Job
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
// Set consistent timestamps for the CronJob
|
|
||||||
if len(tc.jobSpecs) != 0 {
|
|
||||||
firstTime := startTimeStringToTime(tc.jobSpecs[0].StartTime)
|
|
||||||
lastTime := startTimeStringToTime(tc.jobSpecs[len(tc.jobSpecs)-1].StartTime)
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: firstTime}
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: lastTime}
|
|
||||||
} else {
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create jobs
|
|
||||||
js := []batchv1.Job{}
|
|
||||||
jobsToDelete := sets.NewString()
|
|
||||||
cj.Status.Active = []v1.ObjectReference{}
|
|
||||||
|
|
||||||
for i, spec := range tc.jobSpecs {
|
|
||||||
job, err = getJobFromTemplate(&cj, startTimeStringToTime(spec.StartTime))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
job.UID = types.UID(strconv.Itoa(i))
|
|
||||||
job.Namespace = ""
|
|
||||||
|
|
||||||
if spec.IsFinished {
|
|
||||||
var conditionType batchv1.JobConditionType
|
|
||||||
if spec.IsSuccessful {
|
|
||||||
conditionType = batchv1.JobComplete
|
|
||||||
} else {
|
|
||||||
conditionType = batchv1.JobFailed
|
|
||||||
}
|
|
||||||
condition := batchv1.JobCondition{Type: conditionType, Status: v1.ConditionTrue}
|
|
||||||
job.Status.Conditions = append(job.Status.Conditions, condition)
|
|
||||||
|
|
||||||
if spec.IsStillInActiveList {
|
|
||||||
cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID})
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if spec.IsSuccessful || spec.IsStillInActiveList {
|
|
||||||
t.Errorf("%s: test setup error: this case makes no sense", name)
|
|
||||||
}
|
|
||||||
cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID})
|
|
||||||
}
|
|
||||||
|
|
||||||
js = append(js, *job)
|
|
||||||
if spec.ExpectDelete {
|
|
||||||
jobsToDelete.Insert(job.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jc := &fakeJobControl{Job: job}
|
|
||||||
cjc := &fakeCJControl{}
|
|
||||||
recorder := record.NewFakeRecorder(10)
|
|
||||||
|
|
||||||
cleanupFinishedJobs(&cj, js, jc, cjc, recorder)
|
|
||||||
|
|
||||||
// Check we have actually deleted the correct jobs
|
|
||||||
if len(jc.DeleteJobName) != len(jobsToDelete) {
|
|
||||||
t.Errorf("%s: expected %d job deleted, actually %d", name, len(jobsToDelete), len(jc.DeleteJobName))
|
|
||||||
} else {
|
|
||||||
jcDeleteJobName := sets.NewString(jc.DeleteJobName...)
|
|
||||||
if !jcDeleteJobName.Equal(jobsToDelete) {
|
|
||||||
t.Errorf("%s: expected jobs: %v deleted, actually: %v deleted", name, jobsToDelete, jcDeleteJobName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for events
|
|
||||||
expectedEvents := len(jobsToDelete)
|
|
||||||
if name == "failed list pod err" {
|
|
||||||
expectedEvents = len(tc.jobSpecs)
|
|
||||||
}
|
|
||||||
if len(recorder.Events) != expectedEvents {
|
|
||||||
t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for jobs still in active list
|
|
||||||
numActive := 0
|
|
||||||
if len(cjc.Updates) != 0 {
|
|
||||||
numActive = len(cjc.Updates[len(cjc.Updates)-1].Status.Active)
|
|
||||||
}
|
|
||||||
if tc.expectActive != numActive {
|
|
||||||
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, numActive)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs,
|
|
||||||
// but over time, all jobs run as expected (assuming Allow and no deadline).
|
|
||||||
|
|
||||||
// TestSyncOne_Status tests cj.UpdateStatus in syncOne
|
|
||||||
func TestSyncOne_Status(t *testing.T) {
|
|
||||||
finishedJob := newJob("1")
|
|
||||||
finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batchv1.JobCondition{Type: batchv1.JobComplete, Status: v1.ConditionTrue})
|
|
||||||
unexpectedJob := newJob("2")
|
|
||||||
missingJob := newJob("3")
|
|
||||||
|
|
||||||
testCases := map[string]struct {
|
|
||||||
// cj spec
|
|
||||||
concurrencyPolicy batchv1.ConcurrencyPolicy
|
|
||||||
suspend bool
|
|
||||||
schedule string
|
|
||||||
deadline int64
|
|
||||||
|
|
||||||
// cj status
|
|
||||||
ranPreviously bool
|
|
||||||
hasFinishedJob bool
|
|
||||||
|
|
||||||
// environment
|
|
||||||
now time.Time
|
|
||||||
hasUnexpectedJob bool
|
|
||||||
hasMissingJob bool
|
|
||||||
beingDeleted bool
|
|
||||||
|
|
||||||
// expectations
|
|
||||||
expectCreate bool
|
|
||||||
expectDelete bool
|
|
||||||
}{
|
|
||||||
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
|
|
||||||
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
|
|
||||||
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
|
|
||||||
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"never ran, is time, deleting": {A, F, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, T, F, F},
|
|
||||||
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, *justAfterTheHour(), F, F, F, F, F},
|
|
||||||
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, *justAfterTheHour(), F, F, F, F, F},
|
|
||||||
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
|
|
||||||
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F, F},
|
|
||||||
"prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F},
|
|
||||||
"prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F},
|
|
||||||
"prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F},
|
|
||||||
"prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F, F},
|
|
||||||
"prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F, F},
|
|
||||||
"prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F},
|
|
||||||
"prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F, F},
|
|
||||||
"prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F},
|
|
||||||
"prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F},
|
|
||||||
"prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F},
|
|
||||||
"prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F},
|
|
||||||
|
|
||||||
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
"prev ran but done, is time, deleting": {A, F, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, T, F, F},
|
|
||||||
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), F, F, F, F, F},
|
|
||||||
"prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), F, F, F, F, F},
|
|
||||||
"prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, *justAfterTheHour(), T, F, F, F, F},
|
|
||||||
"prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, *justAfterTheHour(), T, F, F, F, F},
|
|
||||||
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), F, F, F, F, F},
|
|
||||||
"prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), F, F, F, F, F},
|
|
||||||
"prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, *justAfterTheHour(), T, F, F, F, F},
|
|
||||||
"prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, *justAfterTheHour(), T, F, F, F, F},
|
|
||||||
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), F, F, F, T, F},
|
|
||||||
"prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
"prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, *justAfterTheHour(), T, F, F, T, F},
|
|
||||||
}
|
|
||||||
|
|
||||||
for name, tc := range testCases {
|
|
||||||
// Don't delete the redundant define 'name' and 'tc', keep those lines for goroutines.
|
|
||||||
name := name
|
|
||||||
tc := tc
|
|
||||||
t.Run(name, func(t *testing.T) {
|
|
||||||
// Setup the test
|
|
||||||
cj := cronJob()
|
|
||||||
cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
|
|
||||||
cj.Spec.Suspend = &tc.suspend
|
|
||||||
cj.Spec.Schedule = tc.schedule
|
|
||||||
if tc.deadline != noDead {
|
|
||||||
cj.Spec.StartingDeadlineSeconds = &tc.deadline
|
|
||||||
}
|
|
||||||
if tc.ranPreviously {
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
|
|
||||||
} else {
|
|
||||||
if tc.hasFinishedJob || tc.hasUnexpectedJob || tc.hasMissingJob {
|
|
||||||
t.Errorf("%s: test setup error: this case makes no sense", name)
|
|
||||||
}
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
|
|
||||||
}
|
|
||||||
jobs := []batchv1.Job{}
|
|
||||||
if tc.hasFinishedJob {
|
|
||||||
ref, err := getRef(&finishedJob)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
|
|
||||||
}
|
|
||||||
cj.Status.Active = []v1.ObjectReference{*ref}
|
|
||||||
jobs = append(jobs, finishedJob)
|
|
||||||
}
|
|
||||||
if tc.hasUnexpectedJob {
|
|
||||||
jobs = append(jobs, unexpectedJob)
|
|
||||||
}
|
|
||||||
if tc.hasMissingJob {
|
|
||||||
ref, err := getRef(&missingJob)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
|
|
||||||
}
|
|
||||||
cj.Status.Active = append(cj.Status.Active, *ref)
|
|
||||||
}
|
|
||||||
if tc.beingDeleted {
|
|
||||||
timestamp := metav1.NewTime(tc.now)
|
|
||||||
cj.DeletionTimestamp = ×tamp
|
|
||||||
}
|
|
||||||
|
|
||||||
jc := &fakeJobControl{}
|
|
||||||
cjc := &fakeCJControl{}
|
|
||||||
recorder := record.NewFakeRecorder(10)
|
|
||||||
|
|
||||||
// Run the code
|
|
||||||
syncOne(&cj, jobs, tc.now, jc, cjc, recorder)
|
|
||||||
|
|
||||||
// Status update happens once when ranging through job list, and another one if create jobs.
|
|
||||||
expectUpdates := 1
|
|
||||||
// Events happens when there's unexpected / finished jobs, and upon job creation / deletion.
|
|
||||||
expectedEvents := 0
|
|
||||||
if tc.expectCreate {
|
|
||||||
expectUpdates++
|
|
||||||
expectedEvents++
|
|
||||||
}
|
|
||||||
if tc.expectDelete {
|
|
||||||
expectedEvents++
|
|
||||||
}
|
|
||||||
if tc.hasFinishedJob {
|
|
||||||
expectedEvents++
|
|
||||||
}
|
|
||||||
if tc.hasUnexpectedJob {
|
|
||||||
expectedEvents++
|
|
||||||
}
|
|
||||||
if tc.hasMissingJob {
|
|
||||||
expectedEvents++
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(recorder.Events) != expectedEvents {
|
|
||||||
t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events)
|
|
||||||
}
|
|
||||||
|
|
||||||
if expectUpdates != len(cjc.Updates) {
|
|
||||||
t.Errorf("%s: expected %d status updates, actually %d", name, expectUpdates, len(cjc.Updates))
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.hasFinishedJob && inActiveList(cjc.Updates[0], finishedJob.UID) {
|
|
||||||
t.Errorf("%s: expected finished job removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.hasUnexpectedJob && inActiveList(cjc.Updates[0], unexpectedJob.UID) {
|
|
||||||
t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.hasMissingJob && inActiveList(cjc.Updates[0], missingJob.UID) {
|
|
||||||
t.Errorf("%s: expected missing job to be removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.expectCreate && !cjc.Updates[1].Status.LastScheduleTime.Time.Equal(*topOfTheHour()) {
|
|
||||||
t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), cjc.Updates[1].Status.LastScheduleTime)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -29,6 +29,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -39,6 +40,7 @@ import (
|
|||||||
batchv1listers "k8s.io/client-go/listers/batch/v1"
|
batchv1listers "k8s.io/client-go/listers/batch/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
ref "k8s.io/client-go/tools/reference"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
@ -47,6 +49,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// controllerKind contains the schema.GroupVersionKind for this controller type.
|
||||||
|
controllerKind = batchv1.SchemeGroupVersion.WithKind("CronJob")
|
||||||
|
|
||||||
nextScheduleDelta = 100 * time.Millisecond
|
nextScheduleDelta = 100 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -694,3 +699,24 @@ func isJobInActiveList(job *batchv1.Job, activeJobs []corev1.ObjectReference) bo
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
|
||||||
|
func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
|
||||||
|
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
|
||||||
|
|
||||||
|
// delete the job itself...
|
||||||
|
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
|
||||||
|
recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
|
||||||
|
klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// ... and its reference from active list
|
||||||
|
deleteFromActiveList(cj, job.ObjectMeta.UID)
|
||||||
|
recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRef(object runtime.Object) (*corev1.ObjectReference, error) {
|
||||||
|
return ref.GetReference(scheme.Scheme, object)
|
||||||
|
}
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
@ -39,6 +40,66 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
shortDead int64 = 10
|
||||||
|
mediumDead int64 = 2 * 60 * 60
|
||||||
|
longDead int64 = 1000000
|
||||||
|
noDead int64 = -12345
|
||||||
|
|
||||||
|
errorSchedule = "obvious error schedule"
|
||||||
|
// schedule is hourly on the hour
|
||||||
|
onTheHour = "0 * * * ?"
|
||||||
|
|
||||||
|
A = batchv1.AllowConcurrent
|
||||||
|
f = batchv1.ForbidConcurrent
|
||||||
|
R = batchv1.ReplaceConcurrent
|
||||||
|
T = true
|
||||||
|
F = false
|
||||||
|
)
|
||||||
|
|
||||||
|
// returns a cronJob with some fields filled in.
|
||||||
|
func cronJob() batchv1.CronJob {
|
||||||
|
return batchv1.CronJob{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "mycronjob",
|
||||||
|
Namespace: "snazzycats",
|
||||||
|
UID: types.UID("1a2b3c"),
|
||||||
|
CreationTimestamp: metav1.Time{Time: justBeforeTheHour()},
|
||||||
|
},
|
||||||
|
Spec: batchv1.CronJobSpec{
|
||||||
|
Schedule: "* * * * ?",
|
||||||
|
ConcurrencyPolicy: batchv1.AllowConcurrent,
|
||||||
|
JobTemplate: batchv1.JobTemplateSpec{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: map[string]string{"a": "b"},
|
||||||
|
Annotations: map[string]string{"x": "y"},
|
||||||
|
},
|
||||||
|
Spec: jobSpec(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func jobSpec() batchv1.JobSpec {
|
||||||
|
one := int32(1)
|
||||||
|
return batchv1.JobSpec{
|
||||||
|
Parallelism: &one,
|
||||||
|
Completions: &one,
|
||||||
|
Template: v1.PodTemplateSpec{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{Image: "foo/bar"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func justASecondBeforeTheHour() time.Time {
|
func justASecondBeforeTheHour() time.Time {
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:59Z")
|
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:59Z")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -47,6 +108,46 @@ func justASecondBeforeTheHour() time.Time {
|
|||||||
return T1
|
return T1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func justAfterThePriorHour() time.Time {
|
||||||
|
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z")
|
||||||
|
if err != nil {
|
||||||
|
panic("test setup error")
|
||||||
|
}
|
||||||
|
return T1
|
||||||
|
}
|
||||||
|
|
||||||
|
func justBeforeThePriorHour() time.Time {
|
||||||
|
T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z")
|
||||||
|
if err != nil {
|
||||||
|
panic("test setup error")
|
||||||
|
}
|
||||||
|
return T1
|
||||||
|
}
|
||||||
|
|
||||||
|
func justAfterTheHour() *time.Time {
|
||||||
|
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z")
|
||||||
|
if err != nil {
|
||||||
|
panic("test setup error")
|
||||||
|
}
|
||||||
|
return &T1
|
||||||
|
}
|
||||||
|
|
||||||
|
func justBeforeTheHour() time.Time {
|
||||||
|
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z")
|
||||||
|
if err != nil {
|
||||||
|
panic("test setup error")
|
||||||
|
}
|
||||||
|
return T1
|
||||||
|
}
|
||||||
|
|
||||||
|
func weekAfterTheHour() time.Time {
|
||||||
|
T1, err := time.Parse(time.RFC3339, "2016-05-26T10:00:00Z")
|
||||||
|
if err != nil {
|
||||||
|
panic("test setup error")
|
||||||
|
}
|
||||||
|
return T1
|
||||||
|
}
|
||||||
|
|
||||||
func TestControllerV2SyncCronJob(t *testing.T) {
|
func TestControllerV2SyncCronJob(t *testing.T) {
|
||||||
// Check expectations on deadline parameters
|
// Check expectations on deadline parameters
|
||||||
if shortDead/60/60 >= 1 {
|
if shortDead/60/60 >= 1 {
|
||||||
|
@ -22,10 +22,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
@ -104,22 +102,6 @@ type realJobControl struct {
|
|||||||
|
|
||||||
var _ jobControlInterface = &realJobControl{}
|
var _ jobControlInterface = &realJobControl{}
|
||||||
|
|
||||||
func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
|
|
||||||
l := make(labels.Set)
|
|
||||||
for k, v := range template.Labels {
|
|
||||||
l[k] = v
|
|
||||||
}
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set {
|
|
||||||
a := make(labels.Set)
|
|
||||||
for k, v := range template.Annotations {
|
|
||||||
a[k] = v
|
|
||||||
}
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r realJobControl) GetJob(namespace, name string) (*batchv1.Job, error) {
|
func (r realJobControl) GetJob(namespace, name string) (*batchv1.Job, error) {
|
||||||
return r.KubeClient.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
return r.KubeClient.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||||
}
|
}
|
||||||
@ -215,59 +197,3 @@ func (f *fakeJobControl) Clear() {
|
|||||||
f.Jobs = []batchv1.Job{}
|
f.Jobs = []batchv1.Job{}
|
||||||
f.Err = nil
|
f.Err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------------------------------------------------ //
|
|
||||||
|
|
||||||
// podControlInterface is an interface that knows how to list or delete pods
|
|
||||||
// created as an interface to allow testing.
|
|
||||||
type podControlInterface interface {
|
|
||||||
// ListPods list pods
|
|
||||||
ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error)
|
|
||||||
// DeleteJob deletes the pod identified by name.
|
|
||||||
// TODO: delete by UID?
|
|
||||||
DeletePod(namespace string, name string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// realPodControl is the default implementation of podControlInterface.
|
|
||||||
type realPodControl struct {
|
|
||||||
KubeClient clientset.Interface
|
|
||||||
Recorder record.EventRecorder
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ podControlInterface = &realPodControl{}
|
|
||||||
|
|
||||||
func (r realPodControl) ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) {
|
|
||||||
return r.KubeClient.CoreV1().Pods(namespace).List(context.TODO(), opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r realPodControl) DeletePod(namespace string, name string) error {
|
|
||||||
return r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakePodControl struct {
|
|
||||||
sync.Mutex
|
|
||||||
Pods []v1.Pod
|
|
||||||
DeletePodName []string
|
|
||||||
Err error
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ podControlInterface = &fakePodControl{}
|
|
||||||
|
|
||||||
func (f *fakePodControl) ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) {
|
|
||||||
f.Lock()
|
|
||||||
defer f.Unlock()
|
|
||||||
if f.Err != nil {
|
|
||||||
return nil, f.Err
|
|
||||||
}
|
|
||||||
return &v1.PodList{Items: f.Pods}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakePodControl) DeletePod(namespace string, name string) error {
|
|
||||||
f.Lock()
|
|
||||||
defer f.Unlock()
|
|
||||||
if f.Err != nil {
|
|
||||||
return f.Err
|
|
||||||
}
|
|
||||||
f.DeletePodName = append(f.DeletePodName, name)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -21,13 +21,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/robfig/cron/v3"
|
"github.com/robfig/cron/v3"
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Utilities for dealing with Jobs and CronJobs and time.
|
// Utilities for dealing with Jobs and CronJobs and time.
|
||||||
@ -56,99 +57,6 @@ func deleteFromActiveList(cj *batchv1.CronJob, uid types.UID) {
|
|||||||
cj.Status.Active = newActive
|
cj.Status.Active = newActive
|
||||||
}
|
}
|
||||||
|
|
||||||
// getParentUIDFromJob extracts UID of job's parent and whether it was found
|
|
||||||
func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) {
|
|
||||||
controllerRef := metav1.GetControllerOf(&j)
|
|
||||||
|
|
||||||
if controllerRef == nil {
|
|
||||||
return types.UID(""), false
|
|
||||||
}
|
|
||||||
|
|
||||||
if controllerRef.Kind != "CronJob" {
|
|
||||||
klog.V(4).Infof("Job with non-CronJob parent, name %s namespace %s", j.Name, j.Namespace)
|
|
||||||
return types.UID(""), false
|
|
||||||
}
|
|
||||||
|
|
||||||
return controllerRef.UID, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. cronJob).
|
|
||||||
// It has no receiver, to facilitate testing.
|
|
||||||
func groupJobsByParent(js []batchv1.Job) map[types.UID][]batchv1.Job {
|
|
||||||
jobsByCj := make(map[types.UID][]batchv1.Job)
|
|
||||||
for _, job := range js {
|
|
||||||
parentUID, found := getParentUIDFromJob(job)
|
|
||||||
if !found {
|
|
||||||
klog.V(4).Infof("Unable to get parent uid from job %s in namespace %s", job.Name, job.Namespace)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
jobsByCj[parentUID] = append(jobsByCj[parentUID], job)
|
|
||||||
}
|
|
||||||
return jobsByCj
|
|
||||||
}
|
|
||||||
|
|
||||||
// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
|
|
||||||
//
|
|
||||||
// If there are too many (>100) unstarted times, just give up and return an empty slice.
|
|
||||||
// If there were missed times prior to the last known start time, then those are not returned.
|
|
||||||
func getRecentUnmetScheduleTimes(cj batchv1.CronJob, now time.Time) ([]time.Time, error) {
|
|
||||||
starts := []time.Time{}
|
|
||||||
sched, err := cron.ParseStandard(cj.Spec.Schedule)
|
|
||||||
if err != nil {
|
|
||||||
return starts, fmt.Errorf("unparseable schedule: %s : %s", cj.Spec.Schedule, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var earliestTime time.Time
|
|
||||||
if cj.Status.LastScheduleTime != nil {
|
|
||||||
earliestTime = cj.Status.LastScheduleTime.Time
|
|
||||||
} else {
|
|
||||||
// If none found, then this is either a recently created cronJob,
|
|
||||||
// or the active/completed info was somehow lost (contract for status
|
|
||||||
// in kubernetes says it may need to be recreated), or that we have
|
|
||||||
// started a job, but have not noticed it yet (distributed systems can
|
|
||||||
// have arbitrary delays). In any case, use the creation time of the
|
|
||||||
// CronJob as last known start time.
|
|
||||||
earliestTime = cj.ObjectMeta.CreationTimestamp.Time
|
|
||||||
}
|
|
||||||
if cj.Spec.StartingDeadlineSeconds != nil {
|
|
||||||
// Controller is not going to schedule anything below this point
|
|
||||||
schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
|
|
||||||
|
|
||||||
if schedulingDeadline.After(earliestTime) {
|
|
||||||
earliestTime = schedulingDeadline
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if earliestTime.After(now) {
|
|
||||||
return []time.Time{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
|
|
||||||
starts = append(starts, t)
|
|
||||||
// An object might miss several starts. For example, if
|
|
||||||
// controller gets wedged on friday at 5:01pm when everyone has
|
|
||||||
// gone home, and someone comes in on tuesday AM and discovers
|
|
||||||
// the problem and restarts the controller, then all the hourly
|
|
||||||
// jobs, more than 80 of them for one hourly cronJob, should
|
|
||||||
// all start running with no further intervention (if the cronJob
|
|
||||||
// allows concurrency and late starts).
|
|
||||||
//
|
|
||||||
// However, if there is a bug somewhere, or incorrect clock
|
|
||||||
// on controller's server or apiservers (for setting creationTimestamp)
|
|
||||||
// then there could be so many missed start times (it could be off
|
|
||||||
// by decades or more), that it would eat up all the CPU and memory
|
|
||||||
// of this controller. In that case, we want to not try to list
|
|
||||||
// all the missed start times.
|
|
||||||
//
|
|
||||||
// I've somewhat arbitrarily picked 100, as more than 80,
|
|
||||||
// but less than "lots".
|
|
||||||
if len(starts) > 100 {
|
|
||||||
// We can't get the most recent times so just return an empty slice
|
|
||||||
return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return starts, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getNextScheduleTime gets the time of next schedule after last scheduled and before now
|
// getNextScheduleTime gets the time of next schedule after last scheduled and before now
|
||||||
// it returns nil if no unmet schedule times.
|
// it returns nil if no unmet schedule times.
|
||||||
// If there are too many (>100) unstarted times, it will raise a warning and but still return
|
// If there are too many (>100) unstarted times, it will raise a warning and but still return
|
||||||
@ -232,28 +140,20 @@ func getMostRecentScheduleTime(earliestTime time.Time, now time.Time, schedule c
|
|||||||
return &t, numberOfMissedSchedules, nil
|
return &t, numberOfMissedSchedules, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getJobFromTemplate makes a Job from a CronJob
|
func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
|
||||||
func getJobFromTemplate(cj *batchv1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
|
l := make(labels.Set)
|
||||||
labels := copyLabels(&cj.Spec.JobTemplate)
|
for k, v := range template.Labels {
|
||||||
annotations := copyAnnotations(&cj.Spec.JobTemplate)
|
l[k] = v
|
||||||
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
|
|
||||||
name := fmt.Sprintf("%s-%d", cj.Name, getTimeHash(scheduledTime))
|
|
||||||
|
|
||||||
job := &batchv1.Job{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: labels,
|
|
||||||
Annotations: annotations,
|
|
||||||
Name: name,
|
|
||||||
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec)
|
return l
|
||||||
return job, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTimeHash returns Unix Epoch Time
|
func copyAnnotations(template *batchv1.JobTemplateSpec) labels.Set {
|
||||||
func getTimeHash(scheduledTime time.Time) int64 {
|
a := make(labels.Set)
|
||||||
return scheduledTime.Unix()
|
for k, v := range template.Annotations {
|
||||||
|
a[k] = v
|
||||||
|
}
|
||||||
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
// getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from
|
// getJobFromTemplate2 makes a Job from a CronJob. It converts the unix time into minutes from
|
||||||
|
@ -29,67 +29,8 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
utilpointer "k8s.io/utils/pointer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetJobFromTemplate(t *testing.T) {
|
|
||||||
// getJobFromTemplate() needs to take the job template and copy the labels and annotations
|
|
||||||
// and other fields, and add a created-by reference.
|
|
||||||
|
|
||||||
var one int64 = 1
|
|
||||||
var no bool
|
|
||||||
|
|
||||||
cj := batchv1.CronJob{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "mycronjob",
|
|
||||||
Namespace: "snazzycats",
|
|
||||||
UID: types.UID("1a2b3c"),
|
|
||||||
SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/mycronjob",
|
|
||||||
},
|
|
||||||
Spec: batchv1.CronJobSpec{
|
|
||||||
Schedule: "* * * * ?",
|
|
||||||
ConcurrencyPolicy: batchv1.AllowConcurrent,
|
|
||||||
JobTemplate: batchv1.JobTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: map[string]string{"a": "b"},
|
|
||||||
Annotations: map[string]string{"x": "y"},
|
|
||||||
},
|
|
||||||
Spec: batchv1.JobSpec{
|
|
||||||
ActiveDeadlineSeconds: &one,
|
|
||||||
ManualSelector: &no,
|
|
||||||
Template: v1.PodTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: map[string]string{
|
|
||||||
"foo": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{Image: "foo/bar"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
var job *batchv1.Job
|
|
||||||
job, err := getJobFromTemplate(&cj, time.Time{})
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Did not expect error: %s", err)
|
|
||||||
}
|
|
||||||
if !strings.HasPrefix(job.ObjectMeta.Name, "mycronjob-") {
|
|
||||||
t.Errorf("Wrong Name")
|
|
||||||
}
|
|
||||||
if len(job.ObjectMeta.Labels) != 1 {
|
|
||||||
t.Errorf("Wrong number of labels")
|
|
||||||
}
|
|
||||||
if len(job.ObjectMeta.Annotations) != 1 {
|
|
||||||
t.Errorf("Wrong number of annotations")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetJobFromTemplate2(t *testing.T) {
|
func TestGetJobFromTemplate2(t *testing.T) {
|
||||||
// getJobFromTemplate2() needs to take the job template and copy the labels and annotations
|
// getJobFromTemplate2() needs to take the job template and copy the labels and annotations
|
||||||
// and other fields, and add a created-by reference.
|
// and other fields, and add a created-by reference.
|
||||||
@ -148,159 +89,6 @@ func TestGetJobFromTemplate2(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetParentUIDFromJob(t *testing.T) {
|
|
||||||
j := &batchv1.Job{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "foobar",
|
|
||||||
Namespace: metav1.NamespaceDefault,
|
|
||||||
},
|
|
||||||
Spec: batchv1.JobSpec{
|
|
||||||
Selector: &metav1.LabelSelector{
|
|
||||||
MatchLabels: map[string]string{"foo": "bar"},
|
|
||||||
},
|
|
||||||
Template: v1.PodTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: map[string]string{
|
|
||||||
"foo": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{Image: "foo/bar"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Status: batchv1.JobStatus{
|
|
||||||
Conditions: []batchv1.JobCondition{{
|
|
||||||
Type: batchv1.JobComplete,
|
|
||||||
Status: v1.ConditionTrue,
|
|
||||||
}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 1: No ControllerRef
|
|
||||||
_, found := getParentUIDFromJob(*j)
|
|
||||||
|
|
||||||
if found {
|
|
||||||
t.Errorf("Unexpectedly found uid")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 2: Has ControllerRef
|
|
||||||
j.ObjectMeta.SetOwnerReferences([]metav1.OwnerReference{
|
|
||||||
{
|
|
||||||
Kind: "CronJob",
|
|
||||||
UID: types.UID("5ef034e0-1890-11e6-8935-42010af0003e"),
|
|
||||||
Controller: utilpointer.BoolPtr(true),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
expectedUID := types.UID("5ef034e0-1890-11e6-8935-42010af0003e")
|
|
||||||
|
|
||||||
uid, found := getParentUIDFromJob(*j)
|
|
||||||
if !found {
|
|
||||||
t.Errorf("Unexpectedly did not find uid")
|
|
||||||
} else if uid != expectedUID {
|
|
||||||
t.Errorf("Wrong UID: %v", uid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGroupJobsByParent(t *testing.T) {
|
|
||||||
uid1 := types.UID("11111111-1111-1111-1111-111111111111")
|
|
||||||
uid2 := types.UID("22222222-2222-2222-2222-222222222222")
|
|
||||||
uid3 := types.UID("33333333-3333-3333-3333-333333333333")
|
|
||||||
|
|
||||||
ownerReference1 := metav1.OwnerReference{
|
|
||||||
Kind: "CronJob",
|
|
||||||
UID: uid1,
|
|
||||||
Controller: utilpointer.BoolPtr(true),
|
|
||||||
}
|
|
||||||
|
|
||||||
ownerReference2 := metav1.OwnerReference{
|
|
||||||
Kind: "CronJob",
|
|
||||||
UID: uid2,
|
|
||||||
Controller: utilpointer.BoolPtr(true),
|
|
||||||
}
|
|
||||||
|
|
||||||
ownerReference3 := metav1.OwnerReference{
|
|
||||||
Kind: "CronJob",
|
|
||||||
UID: uid3,
|
|
||||||
Controller: utilpointer.BoolPtr(true),
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// Case 1: There are no jobs and cronJobs
|
|
||||||
js := []batchv1.Job{}
|
|
||||||
jobsByCj := groupJobsByParent(js)
|
|
||||||
if len(jobsByCj) != 0 {
|
|
||||||
t.Errorf("Wrong number of items in map")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// Case 2: there is one controller with one job it created.
|
|
||||||
js := []batchv1.Job{
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}},
|
|
||||||
}
|
|
||||||
jobsBySj := groupJobsByParent(js)
|
|
||||||
|
|
||||||
if len(jobsBySj) != 1 {
|
|
||||||
t.Errorf("Wrong number of items in map")
|
|
||||||
}
|
|
||||||
jobList1, found := jobsBySj[uid1]
|
|
||||||
if !found {
|
|
||||||
t.Errorf("Key not found")
|
|
||||||
}
|
|
||||||
if len(jobList1) != 1 {
|
|
||||||
t.Errorf("Wrong number of items in map")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// Case 3: Two namespaces, one has two jobs from one controller, other has 3 jobs from two controllers.
|
|
||||||
// There are also two jobs with no created-by annotation.
|
|
||||||
js := []batchv1.Job{
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}},
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference2}}},
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "x", OwnerReferences: []metav1.OwnerReference{ownerReference1}}},
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "d", Namespace: "x", OwnerReferences: []metav1.OwnerReference{}}},
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "y", OwnerReferences: []metav1.OwnerReference{ownerReference3}}},
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "y", OwnerReferences: []metav1.OwnerReference{ownerReference3}}},
|
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "d", Namespace: "y", OwnerReferences: []metav1.OwnerReference{}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
jobsBySj := groupJobsByParent(js)
|
|
||||||
|
|
||||||
if len(jobsBySj) != 3 {
|
|
||||||
t.Errorf("Wrong number of items in map")
|
|
||||||
}
|
|
||||||
jobList1, found := jobsBySj[uid1]
|
|
||||||
if !found {
|
|
||||||
t.Errorf("Key not found")
|
|
||||||
}
|
|
||||||
if len(jobList1) != 2 {
|
|
||||||
t.Errorf("Wrong number of items in map")
|
|
||||||
}
|
|
||||||
jobList2, found := jobsBySj[uid2]
|
|
||||||
if !found {
|
|
||||||
t.Errorf("Key not found")
|
|
||||||
}
|
|
||||||
if len(jobList2) != 1 {
|
|
||||||
t.Errorf("Wrong number of items in map")
|
|
||||||
}
|
|
||||||
jobList3, found := jobsBySj[uid3]
|
|
||||||
if !found {
|
|
||||||
t.Errorf("Key not found")
|
|
||||||
}
|
|
||||||
if len(jobList3) != 2 {
|
|
||||||
t.Errorf("Wrong number of items in map")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetNextScheduleTime(t *testing.T) {
|
func TestGetNextScheduleTime(t *testing.T) {
|
||||||
// schedule is hourly on the hour
|
// schedule is hourly on the hour
|
||||||
schedule := "0 * * * ?"
|
schedule := "0 * * * ?"
|
||||||
@ -427,142 +215,6 @@ func TestGetNextScheduleTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetRecentUnmetScheduleTimes(t *testing.T) {
|
|
||||||
// schedule is hourly on the hour
|
|
||||||
schedule := "0 * * * ?"
|
|
||||||
// T1 is a scheduled start time of that schedule
|
|
||||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("test setup error: %v", err)
|
|
||||||
}
|
|
||||||
// T2 is a scheduled start time of that schedule after T1
|
|
||||||
T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("test setup error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cj := batchv1.CronJob{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "mycronjob",
|
|
||||||
Namespace: metav1.NamespaceDefault,
|
|
||||||
UID: types.UID("1a2b3c"),
|
|
||||||
},
|
|
||||||
Spec: batchv1.CronJobSpec{
|
|
||||||
Schedule: schedule,
|
|
||||||
ConcurrencyPolicy: batchv1.AllowConcurrent,
|
|
||||||
JobTemplate: batchv1.JobTemplateSpec{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 1: no known start times, and none needed yet.
|
|
||||||
// Creation time is before T1.
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
|
|
||||||
// Current time is more than creation time, but less than T1.
|
|
||||||
now := T1.Add(-7 * time.Minute)
|
|
||||||
times, err := getRecentUnmetScheduleTimes(cj, now)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if len(times) != 0 {
|
|
||||||
t.Errorf("expected no start times, got: %v", times)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 2: no known start times, and one needed.
|
|
||||||
// Creation time is before T1.
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
|
|
||||||
// Current time is after T1
|
|
||||||
now := T1.Add(2 * time.Second)
|
|
||||||
times, err := getRecentUnmetScheduleTimes(cj, now)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if len(times) != 1 {
|
|
||||||
t.Errorf("expected 1 start time, got: %v", times)
|
|
||||||
} else if !times[0].Equal(T1) {
|
|
||||||
t.Errorf("expected: %v, got: %v", T1, times[0])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 3: known LastScheduleTime, no start needed.
|
|
||||||
// Creation time is before T1.
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
|
|
||||||
// Status shows a start at the expected time.
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
|
|
||||||
// Current time is after T1
|
|
||||||
now := T1.Add(2 * time.Minute)
|
|
||||||
times, err := getRecentUnmetScheduleTimes(cj, now)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if len(times) != 0 {
|
|
||||||
t.Errorf("expected 0 start times, got: %v", times)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 4: known LastScheduleTime, a start needed
|
|
||||||
// Creation time is before T1.
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
|
|
||||||
// Status shows a start at the expected time.
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
|
|
||||||
// Current time is after T1 and after T2
|
|
||||||
now := T2.Add(5 * time.Minute)
|
|
||||||
times, err := getRecentUnmetScheduleTimes(cj, now)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if len(times) != 1 {
|
|
||||||
t.Errorf("expected 1 start times, got: %v", times)
|
|
||||||
} else if !times[0].Equal(T2) {
|
|
||||||
t.Errorf("expected: %v, got: %v", T1, times[0])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 5: known LastScheduleTime, two starts needed
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
|
|
||||||
// Current time is after T1 and after T2
|
|
||||||
now := T2.Add(5 * time.Minute)
|
|
||||||
times, err := getRecentUnmetScheduleTimes(cj, now)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if len(times) != 2 {
|
|
||||||
t.Errorf("expected 2 start times, got: %v", times)
|
|
||||||
} else {
|
|
||||||
if !times[0].Equal(T1) {
|
|
||||||
t.Errorf("expected: %v, got: %v", T1, times[0])
|
|
||||||
}
|
|
||||||
if !times[1].Equal(T2) {
|
|
||||||
t.Errorf("expected: %v, got: %v", T2, times[1])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 6: now is way way ahead of last start time, and there is no deadline.
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
|
|
||||||
now := T2.Add(10 * 24 * time.Hour)
|
|
||||||
_, err := getRecentUnmetScheduleTimes(cj, now)
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("expected an error")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// Case 7: now is way way ahead of last start time, but there is a short deadline.
|
|
||||||
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
|
|
||||||
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
|
|
||||||
now := T2.Add(10 * 24 * time.Hour)
|
|
||||||
// Deadline is short
|
|
||||||
deadline := int64(2 * 60 * 60)
|
|
||||||
cj.Spec.StartingDeadlineSeconds = &deadline
|
|
||||||
_, err := getRecentUnmetScheduleTimes(cj, now)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestByJobStartTime(t *testing.T) {
|
func TestByJobStartTime(t *testing.T) {
|
||||||
now := metav1.NewTime(time.Date(2018, time.January, 1, 2, 3, 4, 5, time.UTC))
|
now := metav1.NewTime(time.Date(2018, time.January, 1, 2, 3, 4, 5, time.UTC))
|
||||||
later := metav1.NewTime(time.Date(2019, time.January, 1, 2, 3, 4, 5, time.UTC))
|
later := metav1.NewTime(time.Date(2019, time.January, 1, 2, 3, 4, 5, time.UTC))
|
||||||
@ -708,3 +360,20 @@ func TestGetMostRecentScheduleTime(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func topOfTheHour() *time.Time {
|
||||||
|
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
|
||||||
|
if err != nil {
|
||||||
|
panic("test setup error")
|
||||||
|
}
|
||||||
|
return &T1
|
||||||
|
}
|
||||||
|
|
||||||
|
func deltaTimeAfterTopOfTheHour(duration time.Duration) *time.Time {
|
||||||
|
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
|
||||||
|
if err != nil {
|
||||||
|
panic("test setup error")
|
||||||
|
}
|
||||||
|
t := T1.Add(duration)
|
||||||
|
return &t
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user