Merge pull request #29137 from janetkuo/scheduledjob-controller

Automatic merge from submit-queue

Scheduledjob controller

Supersedes #25952; first 7 commits come from #25816

<!--
Checklist for submitting a Pull Request

Please remove this comment block before submitting.

1. Please read our [contributor guidelines](https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md).
2. See our [developer guide](https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md).
3. If you want this PR to automatically close an issue when it is merged,
   add `fixes #<issue number>` or `fixes #<issue number>, fixes #<issue number>`
   to close multiple issues (see: https://github.com/blog/1506-closing-issues-via-pull-requests).
4. Follow the instructions for [labeling and writing a release note for this PR](https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes) in the block below.
-->

```release-note
```
This commit is contained in:
Kubernetes Submit Queue 2016-08-05 18:07:48 -07:00 committed by GitHub
commit 9083ee971e
13 changed files with 1516 additions and 43 deletions

View File

@ -62,6 +62,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
"k8s.io/kubernetes/pkg/controller/scheduledjob"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
@ -381,6 +382,21 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
}
}
groupVersion = "batch/v2alpha1"
resources, found = resourceMap[groupVersion]
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "scheduledjobs") {
glog.Infof("Starting scheduledjob controller")
go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))).
Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Not starting %s apis", groupVersion)
}
provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfiguration)
if err != nil {
glog.Fatalf("A Provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)

View File

@ -108,6 +108,10 @@ type Config struct {
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
}
// TLSClientConfig contains settings to enable transport layer security

View File

@ -55,34 +55,34 @@ var _ ScheduledJobInterface = &scheduledJobs{}
// List returns a list of scheduled jobs that match the label and field selectors.
func (c *scheduledJobs) List(opts api.ListOptions) (result *batch.ScheduledJobList, err error) {
result = &batch.ScheduledJobList{}
err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result)
err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result)
return
}
// Get returns information about a particular scheduled job.
func (c *scheduledJobs) Get(name string) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").Name(name).Do().Into(result)
err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").Name(name).Do().Into(result)
return
}
// Create creates a new scheduled job.
func (c *scheduledJobs) Create(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Post().Namespace(c.ns).Resource("scheduledJobs").Body(job).Do().Into(result)
err = c.r.Post().Namespace(c.ns).Resource("scheduledjobs").Body(job).Do().Into(result)
return
}
// Update updates an existing scheduled job.
func (c *scheduledJobs) Update(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).Body(job).Do().Into(result)
err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).Body(job).Do().Into(result)
return
}
// Delete deletes a scheduled job, returns error if one occurs.
func (c *scheduledJobs) Delete(name string, options *api.DeleteOptions) (err error) {
return c.r.Delete().Namespace(c.ns).Resource("scheduledJobs").Name(name).Body(options).Do().Error()
return c.r.Delete().Namespace(c.ns).Resource("scheduledjobs").Name(name).Body(options).Do().Error()
}
// Watch returns a watch.Interface that watches the requested scheduled jobs.
@ -90,7 +90,7 @@ func (c *scheduledJobs) Watch(opts api.ListOptions) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("scheduledJobs").
Resource("scheduledjobs").
VersionedParams(&opts, api.ParameterCodec).
Watch()
}
@ -98,6 +98,6 @@ func (c *scheduledJobs) Watch(opts api.ListOptions) (watch.Interface, error) {
// UpdateStatus takes the name of the scheduled job and the new status. Returns the server's representation of the scheduled job, and an error, if it occurs.
func (c *scheduledJobs) UpdateStatus(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result)
err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result)
return
}

View File

@ -113,7 +113,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie
framework.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
if job := cur.(*batch.Job); !isJobFinished(job) {
if job := cur.(*batch.Job); !IsJobFinished(job) {
jm.enqueueController(job)
}
},
@ -347,7 +347,7 @@ func (jm *JobController) syncJob(key string) error {
job.Status.StartTime = &now
}
// if job was finished previously, we don't want to redo the termination
if isJobFinished(&job) {
if IsJobFinished(&job) {
return nil
}
if pastActiveDeadline(&job) {
@ -559,15 +559,6 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int {
return result
}
func isJobFinished(j *batch.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue {
return true
}
}
return false
}
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
type byCreationTimestamp []batch.Job

View File

@ -657,31 +657,6 @@ func TestWatchJobs(t *testing.T) {
<-received
}
func TestIsJobFinished(t *testing.T) {
job := &batch.Job{
Status: batch.JobStatus{
Conditions: []batch.JobCondition{{
Type: batch.JobComplete,
Status: api.ConditionTrue,
}},
},
}
if !isJobFinished(job) {
t.Error("Job was expected to be finished")
}
job.Status.Conditions[0].Status = api.ConditionFalse
if isJobFinished(job) {
t.Error("Job was not expected to be finished")
}
job.Status.Conditions[0].Status = api.ConditionUnknown
if isJobFinished(job) {
t.Error("Job was not expected to be finished")
}
}
func TestWatchPods(t *testing.T) {
testJob := newJob(2, 2)
clientset := fake.NewSimpleClientset(testJob)

View File

@ -0,0 +1,31 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch"
)
func IsJobFinished(j *batch.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue {
return true
}
}
return false
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch"
)
func TestIsJobFinished(t *testing.T) {
job := &batch.Job{
Status: batch.JobStatus{
Conditions: []batch.JobCondition{{
Type: batch.JobComplete,
Status: api.ConditionTrue,
}},
},
}
if !IsJobFinished(job) {
t.Error("Job was expected to be finished")
}
job.Status.Conditions[0].Status = api.ConditionFalse
if IsJobFinished(job) {
t.Error("Job was not expected to be finished")
}
job.Status.Conditions[0].Status = api.ConditionUnknown
if IsJobFinished(job) {
t.Error("Job was not expected to be finished")
}
}

View File

@ -0,0 +1,251 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
/*
I did not use watch or expectations. Those add a lot of corner cases, and we aren't
expecting a large volume of jobs or scheduledJobs. (We are favoring correctness
over scalability. If we find a single controller thread is too slow because
there are a lot of Jobs or ScheduledJobs, we we can parallelize by Namespace.
If we find the load on the API server is too high, we can use a watch and
UndeltaStore.)
Just periodically list jobs and SJs, and then reconcile them.
*/
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// Utilities for dealing with Jobs and ScheduledJobs and time.
type ScheduledJobController struct {
kubeClient clientset.Interface
jobControl jobControlInterface
sjControl sjControlInterface
recorder record.EventRecorder
}
func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter())
}
jm := &ScheduledJobController{
kubeClient: kubeClient,
jobControl: realJobControl{KubeClient: kubeClient},
sjControl: &realSJControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}),
}
return jm
}
func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController {
jm := NewScheduledJobController(kubeClient)
return jm
}
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *ScheduledJobController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting ScheduledJob Manager")
// Check things every 10 second.
go wait.Until(jm.SyncAll, 10*time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down ScheduledJob Manager")
}
// SyncAll lists all the ScheduledJobs and Jobs and reconciles them.
func (jm *ScheduledJobController) SyncAll() {
sjl, err := jm.kubeClient.Batch().ScheduledJobs(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Error listing scheduledjobs: %v", err)
return
}
sjs := sjl.Items
glog.Info("Found %d scheduledjobs", len(sjs))
jl, err := jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Error listing jobs")
return
}
js := jl.Items
glog.Info("Found %d jobs", len(js))
jobsBySj := groupJobsByParent(sjs, js)
glog.Info("Found %d groups", len(jobsBySj))
for _, sj := range sjs {
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
}
}
// SyncOne reconciles a ScheduledJob with a list of any Jobs that it created.
// All known jobs created by "sj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
for _, j := range js {
found := inActiveList(sj, j.ObjectMeta.UID)
if !found {
recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
// We found a job object that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
// stop users from creating jobs if they have permission. It is assumed that if a
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else {
if job.IsJobFinished(&j) {
deleteFromActiveList(&sj, j.ObjectMeta.UID)
// TODO: event to call out failure vs success.
recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
}
}
}
err := sjc.UpdateStatus(&sj)
if err != nil {
glog.Errorf("Unable to update status for %s: %v", nameForLog, err)
}
if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
}
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
if len(times) == 0 {
glog.V(4).Infof("No unmet start times for %s", nameForLog)
return
}
if len(times) > 1 {
glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog)
}
scheduledTime := times[len(times)-1]
tooLate := false
if sj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
glog.Errorf("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
return
}
if sj.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(sj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
// So it is theoretically possible to have concurrency with Forbid.
// As long the as the invokations are "far enough apart in time", this usually won't happen.
//
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog)
return
}
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
for _, j := range sj.Status.Active {
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog)
if err := jc.DeleteJob(j.Namespace, j.Name); err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
return
}
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name)
}
}
jobReq, err := getJobFromTemplate(&sj)
if err != nil {
glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return
}
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ //
// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we relist the SJs and Jobs on the next
// iteration of SyncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice. TODO: name the job
// deterministically (via hash of its scheduled time).
// Add the just-started job to the status list.
ref, err := getRef(jobResp)
if err != nil {
glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else {
sj.Status.Active = append(sj.Status.Active, *ref)
}
sj.Status.LastScheduleTime = &unversioned.Time{Time: scheduledTime}
if err := sjc.UpdateStatus(&sj); err != nil {
glog.Infof("Unable to update status for %s: %v", nameForLog, err)
}
return
}
func getRef(object runtime.Object) (*api.ObjectReference, error) {
return api.GetReference(object)
}

View File

@ -0,0 +1,394 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/types"
)
// schedule is hourly on the hour
var (
onTheHour string = "0 0 * * * ?"
)
func justBeforeTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func topOfTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justAfterTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justBeforeThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justAfterThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
// returns a scheduledJob with some fields filled in.
func scheduledJob() batch.ScheduledJob {
return batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{
Name: "myscheduledjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v2alpha1/namespaces/snazzycats/jobs/myscheduledjob",
CreationTimestamp: unversioned.Time{Time: justBeforeTheHour()},
},
Spec: batch.ScheduledJobSpec{
Schedule: "0 0 * * * * ?",
ConcurrencyPolicy: batch.AllowConcurrent,
JobTemplate: batch.JobTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: jobSpec(),
},
},
}
}
func jobSpec() batch.JobSpec {
return batch.JobSpec{
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Image: "foo/bar"},
},
},
},
}
}
func newJob(UID string) batch.Job {
return batch.Job{
ObjectMeta: api.ObjectMeta{
UID: types.UID(UID),
Name: "foobar",
Namespace: api.NamespaceDefault,
SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/myjob",
},
Spec: jobSpec(),
}
}
var (
shortDead int64 = 10
longDead int64 = 1000000
noDead int64 = -12345
A batch.ConcurrencyPolicy = batch.AllowConcurrent
f batch.ConcurrencyPolicy = batch.ForbidConcurrent
R batch.ConcurrencyPolicy = batch.ReplaceConcurrent
T bool = true
F bool = false
)
func TestSyncOne_RunOrNot(t *testing.T) {
testCases := map[string]struct {
// sj spec
concurrencyPolicy batch.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
// sj status
ranPreviously bool
stillActive bool
// environment
now time.Time
// expectations
expectCreate bool
expectDelete bool
}{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), T, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F},
"prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F},
"prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F},
"still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F},
"still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F},
"still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F},
"still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F},
"still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F},
"still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T},
"still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F},
"still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F},
"still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F},
}
for name, tc := range testCases {
t.Log("Test case:", name)
sj := scheduledJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
sj.Spec.Suspend = &tc.suspend
sj.Spec.Schedule = tc.schedule
if tc.deadline != noDead {
sj.Spec.StartingDeadlineSeconds = &tc.deadline
}
if tc.ranPreviously {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()}
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
if tc.stillActive {
sj.Status.Active = []api.ObjectReference{{}}
}
} else {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
if tc.stillActive {
t.Errorf("Test setup error: this case makes no sense.")
}
}
jc := &fakeJobControl{}
sjc := &fakeSJControl{}
recorder := record.NewFakeRecorder(10)
SyncOne(sj, []batch.Job{}, tc.now, jc, sjc, recorder)
expectedCreates := 0
if tc.expectCreate {
expectedCreates = 1
}
if len(jc.Jobs) != expectedCreates {
t.Errorf("Expected %d job started, actually %v", expectedCreates, len(jc.Jobs))
}
expectedDeletes := 0
if tc.expectDelete {
expectedDeletes = 1
}
if len(jc.DeleteJobName) != expectedDeletes {
t.Errorf("Expected %d job deleted, actually %v", expectedDeletes, len(jc.DeleteJobName))
}
expectedEvents := 0
if tc.expectCreate {
expectedEvents += 1
}
if tc.expectDelete {
expectedEvents += 1
}
if len(recorder.Events) != expectedEvents {
t.Errorf("Expected %d event, actually %v", expectedEvents, len(recorder.Events))
}
}
}
// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs,
// but over time, all jobs run as expected (assuming Allow and no deadline).
// TestSyncOne_Status tests sj.UpdateStatus in SyncOne
func TestSyncOne_Status(t *testing.T) {
finishedJob := newJob("1")
finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batch.JobCondition{Type: batch.JobComplete, Status: api.ConditionTrue})
unexpectedJob := newJob("2")
testCases := map[string]struct {
// sj spec
concurrencyPolicy batch.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
// sj status
ranPreviously bool
hasFinishedJob bool
// environment
now time.Time
hasUnexpectedJob bool
// expectations
expectCreate bool
expectDelete bool
}{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, T, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F},
"prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F},
"prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F},
"prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F},
"prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F},
"prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F},
"prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F},
"prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F},
"prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F},
"prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F},
"prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F},
"prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, T, F},
}
for name, tc := range testCases {
t.Log("Test case:", name)
// Setup the test
sj := scheduledJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
sj.Spec.Suspend = &tc.suspend
sj.Spec.Schedule = tc.schedule
if tc.deadline != noDead {
sj.Spec.StartingDeadlineSeconds = &tc.deadline
}
if tc.ranPreviously {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()}
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
} else {
if tc.hasFinishedJob || tc.hasUnexpectedJob {
t.Errorf("Test setup error: this case makes no sense.")
}
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
}
jobs := []batch.Job{}
if tc.hasFinishedJob {
ref, err := getRef(&finishedJob)
if err != nil {
t.Errorf("Test setup error: failed to get job's ref: %v.", err)
}
sj.Status.Active = []api.ObjectReference{*ref}
jobs = append(jobs, finishedJob)
}
if tc.hasUnexpectedJob {
jobs = append(jobs, unexpectedJob)
}
jc := &fakeJobControl{}
sjc := &fakeSJControl{}
recorder := record.NewFakeRecorder(10)
// Run the code
SyncOne(sj, jobs, tc.now, jc, sjc, recorder)
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1
// Events happens when there's unexpected / finished jobs, and upon job creation / deletion.
expectedEvents := 0
if tc.expectCreate {
expectUpdates++
expectedEvents++
}
if tc.expectDelete {
expectedEvents++
}
if tc.hasFinishedJob {
expectedEvents++
}
if tc.hasUnexpectedJob {
expectedEvents++
}
if len(recorder.Events) != expectedEvents {
t.Errorf("Expected %d event, actually %v: %#v", expectedEvents, len(recorder.Events), recorder.Events)
}
if expectUpdates != len(sjc.Updates) {
t.Errorf("expected %d status updates, actually %d", expectUpdates, len(sjc.Updates))
}
if tc.hasFinishedJob && inActiveList(sjc.Updates[0], finishedJob.UID) {
t.Errorf("Expected finished job removed from active list, actually active list = %#v.", sjc.Updates[0].Status.Active)
}
if tc.hasUnexpectedJob && inActiveList(sjc.Updates[0], unexpectedJob.UID) {
t.Errorf("Expected unexpected job not added to active list, actually active list = %#v.", sjc.Updates[0].Status.Active)
}
if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) {
t.Errorf("Expected LastScheduleTime updated to %s, got %s.", topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime)
}
}
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package scheduledjob contains the controller for ScheduledJob objects.
package scheduledjob

View File

@ -0,0 +1,138 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"sync"
"k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/labels"
)
// sjControlInterface is an interface that knows how to update ScheduledJob status
// created as an interface to allow testing.
type sjControlInterface interface {
UpdateStatus(sj *batch.ScheduledJob) error
}
// realSJControl is the default implementation of sjControlInterface.
type realSJControl struct {
KubeClient clientset.Interface
}
var _ sjControlInterface = &realSJControl{}
func (c *realSJControl) UpdateStatus(sj *batch.ScheduledJob) error {
_, err := c.KubeClient.Batch().ScheduledJobs(sj.Namespace).UpdateStatus(sj)
return err
}
// fakeSJControl is the default implementation of sjControlInterface.
type fakeSJControl struct {
Updates []batch.ScheduledJob
}
var _ sjControlInterface = &fakeSJControl{}
func (c *fakeSJControl) UpdateStatus(sj *batch.ScheduledJob) error {
c.Updates = append(c.Updates, *sj)
return nil
}
// ------------------------------------------------------------------ //
// jobControlInterface is an interface that knows how to add or delete jobs
// created as an interface to allow testing.
type jobControlInterface interface {
// CreateJob creates new jobs according to the spec
CreateJob(namespace string, job *batch.Job) (*batch.Job, error)
// DeleteJob deletes the job identified by name.
// TODO: delete by UID?
DeleteJob(namespace string, name string) error
}
// realJobControl is the default implementation of jobControlInterface.
type realJobControl struct {
KubeClient clientset.Interface
Recorder record.EventRecorder
}
var _ jobControlInterface = &realJobControl{}
func copyLabels(template *batch.JobTemplateSpec) labels.Set {
l := make(labels.Set)
for k, v := range template.Labels {
l[k] = v
}
return l
}
func copyAnnotations(template *batch.JobTemplateSpec) labels.Set {
a := make(labels.Set)
for k, v := range template.Annotations {
a[k] = v
}
return a
}
func (r realJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) {
return r.KubeClient.Batch().Jobs(namespace).Create(job)
}
func (r realJobControl) DeleteJob(namespace string, name string) error {
return r.KubeClient.Batch().Jobs(namespace).Delete(name, nil)
}
type fakeJobControl struct {
sync.Mutex
Jobs []batch.Job
DeleteJobName []string
Err error
}
var _ jobControlInterface = &fakeJobControl{}
func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return nil, f.Err
}
f.Jobs = append(f.Jobs, *job)
job.UID = "test-uid"
return job, nil
}
func (f *fakeJobControl) DeleteJob(namespace string, name string) error {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return f.Err
}
f.DeleteJobName = append(f.DeleteJobName, name)
return nil
}
func (f *fakeJobControl) Clear() {
f.Lock()
defer f.Unlock()
f.DeleteJobName = []string{}
f.Jobs = []batch.Job{}
f.Err = nil
}

View File

@ -0,0 +1,227 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"encoding/json"
"fmt"
"time"
"github.com/golang/glog"
"github.com/robfig/cron"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
)
// Utilities for dealing with Jobs and ScheduledJobs and time.
const (
CreatedByAnnotation = "kubernetes.io/created-by"
)
func inActiveList(sj batch.ScheduledJob, uid types.UID) bool {
for _, j := range sj.Status.Active {
if j.UID == uid {
return true
}
}
return false
}
func deleteFromActiveList(sj *batch.ScheduledJob, uid types.UID) {
if sj == nil {
return
}
newActive := []api.ObjectReference{}
for _, j := range sj.Status.Active {
if j.UID != uid {
newActive = append(newActive, j)
}
}
sj.Status.Active = newActive
}
// getParentUIDFromJob extracts UID of job's parent and whether it was found
func getParentUIDFromJob(j batch.Job) (types.UID, bool) {
creatorRefJson, found := j.ObjectMeta.Annotations[CreatedByAnnotation]
if !found {
glog.V(4).Infof("Job with no created-by annotation, name %s namespace %s", j.Name, j.Namespace)
return types.UID(""), false
}
var sr api.SerializedReference
err := json.Unmarshal([]byte(creatorRefJson), &sr)
if err != nil {
glog.V(4).Infof("Job with unparsable created-by annotation, name %s namespace %s: %v", j.Name, j.Namespace, err)
return types.UID(""), false
}
if sr.Reference.Kind != "ScheduledJob" {
glog.V(4).Infof("Job with non-ScheduledJob parent, name %s namespace %s", j.Name, j.Namespace)
return types.UID(""), false
}
// Don't believe a job that claims to have a parent in a different namespace.
if sr.Reference.Namespace != j.Namespace {
glog.V(4).Infof("Alleged scheduledJob parent in different namespace (%s) from Job name %s namespace %s", sr.Reference.Namespace, j.Name, j.Namespace)
return types.UID(""), false
}
return sr.Reference.UID, true
}
// groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. scheduledJob).
// It has no receiver, to facilitate testing.
func groupJobsByParent(sjs []batch.ScheduledJob, js []batch.Job) map[types.UID][]batch.Job {
jobsBySj := make(map[types.UID][]batch.Job)
for _, job := range js {
parentUID, found := getParentUIDFromJob(job)
if !found {
glog.Errorf("Unable to get uid from job %s in namespace %s", job.Name, job.Namespace)
continue
}
jobsBySj[parentUID] = append(jobsBySj[parentUID], job)
}
return jobsBySj
}
// getNextStartTimeAfter gets the latest scheduled start time that is less than "now", or an error.
func getNextStartTimeAfter(schedule string, now time.Time) (time.Time, error) {
// Using robfig/cron for cron scheduled parsing and next runtime
// computation. Not using the entire library because:
// - I want to detect when we missed a runtime due to being down.
// - How do I set the time such that I can detect the last known runtime?
// - I guess the functions could launch a go-routine to start the job and
// then return.
// How to handle concurrency control.
// How to detect changes to schedules or deleted schedules and then
// update the jobs?
sched, err := cron.Parse(schedule)
if err != nil {
return time.Unix(0, 0), fmt.Errorf("Unparseable schedule: %s : %s", schedule, err)
}
return sched.Next(now), nil
}
// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
//
// If there are too many (>100) unstarted times, just give up and return an empty slice.
// If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{}
sched, err := cron.Parse(sj.Spec.Schedule)
if err != nil {
return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
}
var earliestTime time.Time
if sj.Status.LastScheduleTime != nil {
earliestTime = sj.Status.LastScheduleTime.Time
} else {
// If none found, then this is either a recently created scheduledJob,
// or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the
// ScheduledJob as last known start time.
earliestTime = sj.ObjectMeta.CreationTimestamp.Time
}
if earliestTime.After(now) {
return []time.Time{}, nil
}
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
starts = append(starts, t)
// An object might miss several starts. For example, if
// controller gets wedged on friday at 5:01pm when everyone has
// gone home, and someone comes in on tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should
// all start running with no further intervention (if the scheduledJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the misseded start times.
//
// I've somewhat arbitrarily picked 100, as more than 80, but
// but less than "lots".
if len(starts) > 100 {
// We can't get the most recent times so just return an empty slice
return []time.Time{}, fmt.Errorf("Too many missed start times to list")
}
}
return starts, nil
}
// XXX unit test this
// getJobFromTemplate makes a Job from a ScheduledJob
func getJobFromTemplate(sj *batch.ScheduledJob) (*batch.Job, error) {
// TODO: consider adding the following labels:
// nominal-start-time=$RFC_3339_DATE_OF_INTENDED_START -- for user convenience
// scheduled-job-name=$SJ_NAME -- for user convenience
labels := copyLabels(&sj.Spec.JobTemplate)
annotations := copyAnnotations(&sj.Spec.JobTemplate)
createdByRefJson, err := makeCreatedByRefJson(sj)
if err != nil {
return nil, err
}
annotations[CreatedByAnnotation] = string(createdByRefJson)
// TODO: instead of using generateName, use a deterministic hash of the nominal
// start time, to prevent same job being created twice.
// We want job names for a given nominal start time to have a predictable name to avoid
prefix := fmt.Sprintf("%s-", sj.Name)
job := &batch.Job{
ObjectMeta: api.ObjectMeta{
Labels: labels,
Annotations: annotations,
GenerateName: prefix,
},
}
if err := api.Scheme.Convert(&sj.Spec.JobTemplate.Spec, &job.Spec); err != nil {
return nil, fmt.Errorf("unable to convert job template: %v", err)
}
return job, nil
}
// makeCreatedByRefJson makes a json string with an object reference for use in "created-by" annotation value
func makeCreatedByRefJson(object runtime.Object) (string, error) {
createdByRef, err := api.GetReference(object)
if err != nil {
return "", fmt.Errorf("unable to get controller reference: %v", err)
}
// TODO: this code was not safe previously - as soon as new code came along that switched to v2, old clients
// would be broken upon reading it. This is explicitly hardcoded to v1 to guarantee predictable deployment.
// We need to consistently handle this case of annotation versioning.
codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"})
createdByRefJson, err := runtime.Encode(codec, &api.SerializedReference{
Reference: *createdByRef,
})
if err != nil {
return "", fmt.Errorf("unable to serialize controller reference: %v", err)
}
return string(createdByRefJson), nil
}

View File

@ -0,0 +1,379 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
//"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/types"
//"k8s.io/kubernetes/pkg/controller"
// "k8s.io/kubernetes/pkg/util/rand"
)
func TestGetJobFromTemplate(t *testing.T) {
// getJobFromTemplate() needs to take the job template and copy the labels and annotations
// and other fields, and add a created-by reference.
var one int64 = 1
var no bool = false
sj := batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{
Name: "myscheduledjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
SelfLink: "/apis/extensions/v1beta1/namespaces/snazzycats/jobs/myscheduledjob",
},
Spec: batch.ScheduledJobSpec{
Schedule: "0 0 * * * * ?",
ConcurrencyPolicy: batch.AllowConcurrent,
JobTemplate: batch.JobTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: batch.JobSpec{
ActiveDeadlineSeconds: &one,
ManualSelector: &no,
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Image: "foo/bar"},
},
},
},
},
},
},
}
var job *batch.Job
job, err := getJobFromTemplate(&sj)
if err != nil {
t.Errorf("Did not expect error: %s", err)
}
if job.ObjectMeta.GenerateName != "myscheduledjob-" {
t.Errorf("Wrong GenerateName")
}
if len(job.ObjectMeta.Labels) != 1 {
t.Errorf("Wrong number of labels")
}
if len(job.ObjectMeta.Annotations) != 2 {
t.Errorf("Wrong number of annotations")
}
v, ok := job.ObjectMeta.Annotations["kubernetes.io/created-by"]
if !ok {
t.Errorf("Missing created-by annotation")
}
expectedCreatedBy := `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"snazzycats","name":"myscheduledjob","uid":"1a2b3c","apiVersion":"extensions"}}
`
if len(v) != len(expectedCreatedBy) {
t.Errorf("Wrong length for created-by annotation, expected %v got %v", len(expectedCreatedBy), len(v))
}
if v != expectedCreatedBy {
t.Errorf("Wrong value for created-by annotation, expected %v got %v", expectedCreatedBy, v)
}
}
func TestGetParentUIDFromJob(t *testing.T) {
j := &batch.Job{
ObjectMeta: api.ObjectMeta{
Name: "foobar",
Namespace: api.NamespaceDefault,
},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Image: "foo/bar"},
},
},
},
},
Status: batch.JobStatus{
Conditions: []batch.JobCondition{{
Type: batch.JobComplete,
Status: api.ConditionTrue,
}},
},
}
{
// Case 1: No UID annotation
_, found := getParentUIDFromJob(*j)
if found {
t.Errorf("Unexpectedly found uid")
}
}
{
// Case 2: Has UID annotation
j.ObjectMeta.Annotations = map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"default","name":"pi","uid":"5ef034e0-1890-11e6-8935-42010af0003e","apiVersion":"extensions","resourceVersion":"427339"}}`}
expectedUID := types.UID("5ef034e0-1890-11e6-8935-42010af0003e")
uid, found := getParentUIDFromJob(*j)
if !found {
t.Errorf("Unexpectedly did not find uid")
} else if uid != expectedUID {
t.Errorf("Wrong UID: %v", uid)
}
}
}
func TestGroupJobsByParent(t *testing.T) {
uid1 := types.UID("11111111-1111-1111-1111-111111111111")
uid2 := types.UID("22222222-2222-2222-2222-222222222222")
uid3 := types.UID("33333333-3333-3333-3333-333333333333")
createdBy1 := map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"x","name":"pi","uid":"11111111-1111-1111-1111-111111111111","apiVersion":"extensions","resourceVersion":"111111"}}`}
createdBy2 := map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"x","name":"pi","uid":"22222222-2222-2222-2222-222222222222","apiVersion":"extensions","resourceVersion":"222222"}}`}
createdBy3 := map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"y","name":"pi","uid":"33333333-3333-3333-3333-333333333333","apiVersion":"extensions","resourceVersion":"333333"}}`}
noCreatedBy := map[string]string{}
{
// Case 1: There are no jobs and scheduledJobs
sjs := []batch.ScheduledJob{}
js := []batch.Job{}
jobsBySj := groupJobsByParent(sjs, js)
if len(jobsBySj) != 0 {
t.Errorf("Wrong number of items in map")
}
}
{
// Case 2: there is one controller with no job.
sjs := []batch.ScheduledJob{
{ObjectMeta: api.ObjectMeta{Name: "e", Namespace: "x", UID: uid1}},
}
js := []batch.Job{}
jobsBySj := groupJobsByParent(sjs, js)
if len(jobsBySj) != 0 {
t.Errorf("Wrong number of items in map")
}
}
{
// Case 3: there is one controller with one job it created.
sjs := []batch.ScheduledJob{
{ObjectMeta: api.ObjectMeta{Name: "e", Namespace: "x", UID: uid1}},
}
js := []batch.Job{
{ObjectMeta: api.ObjectMeta{Name: "a", Namespace: "x", Annotations: createdBy1}},
}
jobsBySj := groupJobsByParent(sjs, js)
if len(jobsBySj) != 1 {
t.Errorf("Wrong number of items in map")
}
jobList1, found := jobsBySj[uid1]
if !found {
t.Errorf("Key not found")
}
if len(jobList1) != 1 {
t.Errorf("Wrong number of items in map")
}
}
{
// Case 4: Two namespaces, one has two jobs from one controller, other has 3 jobs from two controllers.
// There are also two jobs with no created-by annotation.
js := []batch.Job{
{ObjectMeta: api.ObjectMeta{Name: "a", Namespace: "x", Annotations: createdBy1}},
{ObjectMeta: api.ObjectMeta{Name: "b", Namespace: "x", Annotations: createdBy2}},
{ObjectMeta: api.ObjectMeta{Name: "c", Namespace: "x", Annotations: createdBy1}},
{ObjectMeta: api.ObjectMeta{Name: "d", Namespace: "x", Annotations: noCreatedBy}},
{ObjectMeta: api.ObjectMeta{Name: "a", Namespace: "y", Annotations: createdBy3}},
{ObjectMeta: api.ObjectMeta{Name: "b", Namespace: "y", Annotations: createdBy3}},
{ObjectMeta: api.ObjectMeta{Name: "d", Namespace: "y", Annotations: noCreatedBy}},
}
sjs := []batch.ScheduledJob{
{ObjectMeta: api.ObjectMeta{Name: "e", Namespace: "x", UID: uid1}},
{ObjectMeta: api.ObjectMeta{Name: "f", Namespace: "x", UID: uid2}},
{ObjectMeta: api.ObjectMeta{Name: "g", Namespace: "y", UID: uid3}},
}
jobsBySj := groupJobsByParent(sjs, js)
if len(jobsBySj) != 3 {
t.Errorf("Wrong number of items in map")
}
jobList1, found := jobsBySj[uid1]
if !found {
t.Errorf("Key not found")
}
if len(jobList1) != 2 {
t.Errorf("Wrong number of items in map")
}
jobList2, found := jobsBySj[uid2]
if !found {
t.Errorf("Key not found")
}
if len(jobList2) != 1 {
t.Errorf("Wrong number of items in map")
}
jobList3, found := jobsBySj[uid3]
if !found {
t.Errorf("Key not found")
}
if len(jobList3) != 2 {
t.Errorf("Wrong number of items in map")
}
}
}
func TestGetRecentUnmetScheduleTimes(t *testing.T) {
// schedule is hourly on the hour
schedule := "0 0 * * * ?"
// T1 is a scheduled start time of that schedule
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
t.Errorf("test setup error: %v", err)
}
// T2 is a scheduled start time of that schedule after T1
T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z")
if err != nil {
t.Errorf("test setup error: %v", err)
}
sj := batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{
Name: "myscheduledjob",
Namespace: api.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.ScheduledJobSpec{
Schedule: schedule,
ConcurrencyPolicy: batch.AllowConcurrent,
JobTemplate: batch.JobTemplateSpec{},
},
}
{
// Case 1: no known start times, and none needed yet.
// Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is more than creation time, but less than T1.
now := T1.Add(-7 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 0 {
t.Errorf("expected no start times, got: %v", times)
}
}
{
// Case 2: no known start times, and one needed.
// Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is after T1
now := T1.Add(2 * time.Second)
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 1 {
t.Errorf("expected 1 start time, got: %v", times)
} else if !times[0].Equal(T1) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
}
{
// Case 3: known LastScheduleTime, no start needed.
// Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time.
sj.Status.LastScheduleTime = &unversioned.Time{Time: T1}
// Current time is after T1
now := T1.Add(2 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 0 {
t.Errorf("expected 0 start times, got: , got: %v", times)
}
}
{
// Case 4: known LastScheduleTime, a start needed
// Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time.
sj.Status.LastScheduleTime = &unversioned.Time{Time: T1}
// Current time is after T1 and after T2
now := T2.Add(5 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 1 {
t.Errorf("expected 2 start times, got: , got: %v", times)
} else if !times[0].Equal(T2) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
}
{
// Case 5: known LastScheduleTime, two starts needed
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-2 * time.Hour)}
sj.Status.LastScheduleTime = &unversioned.Time{Time: T1.Add(-1 * time.Hour)}
// Current time is after T1 and after T2
now := T2.Add(5 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(times) != 2 {
t.Errorf("expected 2 start times, got: , got: %v", times)
} else {
if !times[0].Equal(T1) {
t.Errorf("expected: %v, got: %v", T1, times[0])
}
if !times[1].Equal(T2) {
t.Errorf("expected: %v, got: %v", T2, times[1])
}
}
}
{
// Case 6: now is way way ahead of last start time.
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-2 * time.Hour)}
sj.Status.LastScheduleTime = &unversioned.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour)
_, err := getRecentUnmetScheduleTimes(sj, now)
if err == nil {
t.Errorf("unexpected lack of error")
}
}
}