ScheduledJob controller

This commit is contained in:
Eric Tune 2016-05-20 00:53:39 -07:00 committed by Janet Kuo
parent e4b067efd0
commit 8675e014fb
10 changed files with 900 additions and 7 deletions

View File

@ -62,6 +62,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
"k8s.io/kubernetes/pkg/controller/scheduledjob"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
@ -381,6 +382,21 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
}
}
groupVersion = "batch/v2alpha1"
resources, found = resourceMap[groupVersion]
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "scheduledjobs") {
glog.Infof("Starting scheduledjob controller")
go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))).
Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Not starting %s apis", groupVersion)
}
provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfiguration)
if err != nil {
glog.Fatalf("A Provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)

View File

@ -18,6 +18,7 @@ package unversioned
import (
api "k8s.io/kubernetes/pkg/api"
//"k8s.io/kubernetes/pkg/api/unversioned"
registered "k8s.io/kubernetes/pkg/apimachinery/registered"
restclient "k8s.io/kubernetes/pkg/client/restclient"
)
@ -81,9 +82,17 @@ func setConfigDefaults(config *restclient.Config) error {
}
// TODO: Unconditionally set the config.Version, until we fix the config.
//if config.Version == "" {
// XXX why is above commented out?
copyGroupVersion := g.GroupVersion
config.GroupVersion = &copyGroupVersion
//}
// Do we need something like this:
// if config.Version == "" {
// copyGroupVersion := g.GroupVersion
// config.GroupVersion = &copyGroupVersion
//} else {
// config.GroupVersion = &unversioned.GroupVersion{Group: "batch", Version: config.Version}
//}
config.NegotiatedSerializer = api.Codecs

View File

@ -108,6 +108,10 @@ type Config struct {
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
}
// TLSClientConfig contains settings to enable transport layer security

View File

@ -55,34 +55,34 @@ var _ ScheduledJobInterface = &scheduledJobs{}
// List returns a list of scheduled jobs that match the label and field selectors.
func (c *scheduledJobs) List(opts api.ListOptions) (result *batch.ScheduledJobList, err error) {
result = &batch.ScheduledJobList{}
err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result)
err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result)
return
}
// Get returns information about a particular scheduled job.
func (c *scheduledJobs) Get(name string) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").Name(name).Do().Into(result)
err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").Name(name).Do().Into(result)
return
}
// Create creates a new scheduled job.
func (c *scheduledJobs) Create(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Post().Namespace(c.ns).Resource("scheduledJobs").Body(job).Do().Into(result)
err = c.r.Post().Namespace(c.ns).Resource("scheduledjobs").Body(job).Do().Into(result)
return
}
// Update updates an existing scheduled job.
func (c *scheduledJobs) Update(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).Body(job).Do().Into(result)
err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).Body(job).Do().Into(result)
return
}
// Delete deletes a scheduled job, returns error if one occurs.
func (c *scheduledJobs) Delete(name string, options *api.DeleteOptions) (err error) {
return c.r.Delete().Namespace(c.ns).Resource("scheduledJobs").Name(name).Body(options).Do().Error()
return c.r.Delete().Namespace(c.ns).Resource("scheduledjobs").Name(name).Body(options).Do().Error()
}
// Watch returns a watch.Interface that watches the requested scheduled jobs.
@ -90,7 +90,7 @@ func (c *scheduledJobs) Watch(opts api.ListOptions) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("scheduledJobs").
Resource("scheduledjobs").
VersionedParams(&opts, api.ParameterCodec).
Watch()
}
@ -98,6 +98,6 @@ func (c *scheduledJobs) Watch(opts api.ListOptions) (watch.Interface, error) {
// UpdateStatus takes the name of the scheduled job and the new status. Returns the server's representation of the scheduled job, and an error, if it occurs.
func (c *scheduledJobs) UpdateStatus(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result)
err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result)
return
}

View File

@ -0,0 +1,253 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
/*
I did not use watch or expectations. Those add a lot of corner cases, and we aren't
expecting a large volume of jobs or scheduledJobs. (We are favoring correctness
over scalability. If we find a single controller thread is too slow because
there are a lot of Jobs or ScheduledJobs, we we can parallelize by Namespace.
If we find the load on the API server is too high, we can use a watch and
UndeltaStore.)
Just periodically list jobs and SJs, and then reconcile them.
*/
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// Utilities for dealing with Jobs and ScheduledJobs and time.
type ScheduledJobController struct {
kubeClient clientset.Interface
jobControl jobControlInterface
sjControl sjControlInterface
recorder record.EventRecorder
}
func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter())
}
jm := &ScheduledJobController{
kubeClient: kubeClient,
jobControl: realJobControl{KubeClient: kubeClient},
sjControl: &realSJControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}),
}
return jm
}
func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController {
jm := NewScheduledJobController(kubeClient)
return jm
}
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *ScheduledJobController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting ScheduledJob Manager")
// Check things every 1 second.
go wait.Until(jm.SyncAll, 1*time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down ScheduledJob Manager")
}
// SyncAll lists all the ScheduledJobs and Jobs and reconciles them.
func (jm *ScheduledJobController) SyncAll() {
sjl, err := jm.kubeClient.Batch().ScheduledJobs(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Error listing scheduledjobs: %v", err)
return
}
sjs := sjl.Items
glog.Info("Found %d scheduledjobs", len(sjs))
jl, err := jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Error listing jobs")
return
}
js := jl.Items
glog.Info("Found %d jobs", len(js))
jobsBySj := groupJobsByParent(sjs, js)
glog.Info("Found %d groups", len(jobsBySj))
for _, sj := range sjs {
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
}
}
// SyncOne reconciles a ScheduledJob with a list of any Jobs that it created.
// All known jobs created by "sj" should be included in "js".
// Returns a new ScheduledJobStatus if an update to status is required, else nil.
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("namespace/%s/scheduledJob/%s", sj.Namespace, sj.Name)
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
for _, j := range js {
found := inActiveList(sj, j.ObjectMeta.UID)
if !found {
recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
// We found a job object that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
// stop users from creating jobs if they have permission. It is assumed that if a
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else {
if isJobActive(&j) {
deleteFromActiveList(&sj, j.ObjectMeta.UID)
// TODO: event to call out failure vs success.
recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
}
}
}
err := sjc.UpdateStatus(&sj)
if err != nil {
glog.Errorf("Unable to update status for %s: %v", nameForLog, err)
}
if sj.Spec.Suspend {
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
}
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
if len(times) == 0 {
glog.V(4).Infof("No unmet start times for %s", nameForLog)
return
}
if len(times) > 1 {
glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog)
}
scheduledTime := times[len(times)-1]
tooLate := false
if sj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
glog.Errorf("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
return
}
if sj.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(sj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
// So it is theoretically possible to have concurrency with Forbid.
// As long the as the invokations are "far enough apart in time", this usually won't happen.
//
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog)
return
}
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
glog.Errorf("Not starting job for %s because of prior execution still running and concurrency policy is Replace and delete is not supported yet", nameForLog)
for _, j := range sj.Status.Active {
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog)
if err := jc.DeleteJob(j.Namespace, j.Name); err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
return
}
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name)
}
}
jobReq, err := getJobFromTemplate(&sj)
if err != nil {
glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return
}
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ //
// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we relist the SJs and Jobs on the next
// iteration of SyncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice. TODO: name the job
// deterministically.
// Add the just-started job to the status list.
ref, err := getRef(jobResp)
if err != nil {
glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else {
sj.Status.Active = append(sj.Status.Active, *ref)
}
sj.Status.LastScheduleTime = &unversioned.Time{scheduledTime}
if err := sjc.UpdateStatus(&sj); err != nil {
glog.Infof("Unable to update status for %s: %v", nameForLog, err)
}
return
}
func getRef(object runtime.Object) (*api.ObjectReference, error) {
return api.GetReference(object)
}

View File

@ -0,0 +1,230 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/types"
"testing"
"time"
)
// schedule is hourly on the hour
var (
onTheHour string = "0 0 * * * ?"
)
func justBeforeTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func topOfTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justAfterTheHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justBeforeThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
func justAfterThePriorHour() time.Time {
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z")
if err != nil {
panic("test setup error")
}
return T1
}
// returns a scheduledJob with some fields filled in.
func scheduledJob() batch.ScheduledJob {
return batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{
Name: "myscheduledjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
SelfLink: "/apis/extensions/v1beta1/namespaces/snazzycats/jobs/myscheduledjob",
CreationTimestamp: unversioned.Time{justBeforeTheHour()},
},
Spec: batch.ScheduledJobSpec{
Schedule: "0 0 * * * * ?",
ConcurrencyPolicy: batch.AllowConcurrent,
JobTemplate: batch.JobTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
},
Spec: batch.JobSpec{
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Image: "foo/bar"},
},
},
},
},
},
},
}
}
var (
shortDead int64 = 10
longDead int64 = 1000000
noDead int64 = -12345
A batch.ConcurrencyPolicy = batch.AllowConcurrent
f batch.ConcurrencyPolicy = batch.ForbidConcurrent
R batch.ConcurrencyPolicy = batch.ReplaceConcurrent
T bool = true
F bool = false
)
func TestSyncOne_RunOrNot(t *testing.T) {
testCases := map[string]struct {
// sj spec
concurrencyPolicy batch.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
// sj status
ranPreviously bool
stillActive bool
// environment
now time.Time
// expectations
expectCreate bool
expectDelete bool
}{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), T, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F},
"prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F},
"prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F},
"still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F},
"still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F},
"still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F},
"still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F},
"still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F},
"still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T},
"still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F},
"still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F},
"still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F},
}
for name, tc := range testCases {
t.Log("Test case:", name)
sj := scheduledJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
sj.Spec.Suspend = tc.suspend
sj.Spec.Schedule = tc.schedule
if tc.deadline != noDead {
sj.Spec.StartingDeadlineSeconds = &tc.deadline
}
if tc.ranPreviously {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{justBeforeThePriorHour()}
sj.Status.LastScheduleTime = &unversioned.Time{justAfterThePriorHour()}
if tc.stillActive {
sj.Status.Active = []api.ObjectReference{{}}
}
} else {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{justBeforeTheHour()}
if tc.stillActive {
t.Errorf("Test setup error: this case makes no sense.")
}
}
jc := &fakeJobControl{}
sjc := &fakeSJControl{}
recorder := record.NewFakeRecorder(10)
SyncOne(sj, []batch.Job{}, tc.now, jc, sjc, recorder)
expectedCreates := 0
if tc.expectCreate {
expectedCreates = 1
}
if len(jc.Jobs) != expectedCreates {
t.Errorf("Expected %d job started, actually %v", expectedCreates, len(jc.Jobs))
}
expectedDeletes := 0
if tc.expectDelete {
expectedDeletes = 1
}
if len(jc.DeleteJobName) != expectedDeletes {
t.Errorf("Expected %d job deleted, actually %v", expectedDeletes, len(jc.DeleteJobName))
}
expectedEvents := 0
if tc.expectCreate {
expectedEvents += 1
}
if tc.expectDelete {
expectedEvents += 1
}
if len(recorder.Events) != expectedEvents {
t.Errorf("Expected %d event, actually %v", expectedEvents, len(recorder.Events))
}
}
}
// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs,
// but over time, all jobs run as expected (assuming Allow and no deadline).

View File

@ -0,0 +1,138 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"sync"
"k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/labels"
)
// sjControlInterface is an interface that knows how to update ScheduledJob status
// created as an interface to allow testing.
type sjControlInterface interface {
UpdateStatus(sj *batch.ScheduledJob) error
}
// realSJControl is the default implementation of sjControlInterface.
type realSJControl struct {
KubeClient clientset.Interface
}
var _ sjControlInterface = &realSJControl{}
func (c *realSJControl) UpdateStatus(sj *batch.ScheduledJob) error {
_, err := c.KubeClient.Batch().ScheduledJobs(sj.Namespace).UpdateStatus(sj)
return err
}
// fakeSJControl is the default implementation of sjControlInterface.
type fakeSJControl struct {
Updates []batch.ScheduledJob
}
var _ sjControlInterface = &fakeSJControl{}
func (c *fakeSJControl) UpdateStatus(sj *batch.ScheduledJob) error {
c.Updates = append(c.Updates, *sj)
return nil
}
// ------------------------------------------------------------------ //
// jobControlInterface is an interface that knows how to add or delete jobs
// created as an interface to allow testing.
type jobControlInterface interface {
// CreateJob creates new jobs according to the spec
CreateJob(namespace string, job *batch.Job) (*batch.Job, error)
// DeleteJob deletes the job identified by name.
// TODO: delete by UID?
DeleteJob(namespace string, name string) error
}
// realJobControl is the default implementation of jobControlInterface.
type realJobControl struct {
KubeClient clientset.Interface
Recorder record.EventRecorder
}
var _ jobControlInterface = &realJobControl{}
func copyLabels(template *batch.JobTemplateSpec) labels.Set {
l := make(labels.Set)
for k, v := range template.Labels {
l[k] = v
}
return l
}
func copyAnnotations(template *batch.JobTemplateSpec) labels.Set {
a := make(labels.Set)
for k, v := range template.Annotations {
a[k] = v
}
return a
}
func (r realJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) {
return r.KubeClient.Batch().Jobs(namespace).Create(job)
}
func (r realJobControl) DeleteJob(namespace string, name string) error {
return r.KubeClient.Batch().Jobs(namespace).Delete(name, nil)
}
type fakeJobControl struct {
sync.Mutex
Jobs []batch.Job
DeleteJobName []string
Err error
}
var _ jobControlInterface = &fakeJobControl{}
func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return nil, f.Err
}
f.Jobs = append(f.Jobs, *job)
job.UID = "test-uid"
return job, nil
}
func (f *fakeJobControl) DeleteJob(namespace string, name string) error {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return f.Err
}
f.DeleteJobName = append(f.DeleteJobName, name)
return nil
}
func (f *fakeJobControl) Clear() {
f.Lock()
defer f.Unlock()
f.DeleteJobName = []string{}
f.Jobs = []batch.Job{}
f.Err = nil
}

View File

@ -0,0 +1,239 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"encoding/json"
"fmt"
"time"
"github.com/golang/glog"
"github.com/robfig/cron"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
)
// Utilities for dealing with Jobs and ScheduledJobs and time.
const (
CreatedByAnnotation = "kubernetes.io/created-by"
)
func inActiveList(sj batch.ScheduledJob, uid types.UID) bool {
for _, j := range sj.Status.Active {
if j.UID == uid {
return true
}
}
return false
}
func deleteFromActiveList(sj *batch.ScheduledJob, uid types.UID) {
if sj == nil {
return
}
newActive := []api.ObjectReference{}
for _, j := range sj.Status.Active {
if j.UID != uid {
newActive = append(newActive, j)
}
}
sj.Status.Active = newActive
}
// getParentUIDFromJob extracts UID of job's parent and whether it was found
func getParentUIDFromJob(j batch.Job) (types.UID, bool) {
creatorRefJson, found := j.ObjectMeta.Annotations[CreatedByAnnotation]
if !found {
glog.V(4).Infof("Job with no created-by annotation, name %s namespace %s", j.Name, j.Namespace)
return types.UID(""), false
}
var sr api.SerializedReference
err := json.Unmarshal([]byte(creatorRefJson), &sr)
if err != nil {
glog.V(4).Infof("Job with unparsable created-by annotation, name %s namespace %s: %v", j.Name, j.Namespace, err)
return types.UID(""), false
}
if sr.Reference.Kind != "ScheduledJob" {
glog.V(4).Infof("Job with non-ScheduledJob parent, name %s namespace %s", j.Name, j.Namespace)
return types.UID(""), false
}
// Don't believe a job that claims to have a parent in a different namespace.
if sr.Reference.Namespace != j.Namespace {
glog.V(4).Infof("Alleged scheduledJob parent in different namespace (%s) from Job name %s namespace %s", sr.Reference.Namespace, j.Name, j.Namespace)
return types.UID(""), false
}
return sr.Reference.UID, true
}
// groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. scheduledJob).
// It has no receiver, to facilitate testing.
func groupJobsByParent(sjs []batch.ScheduledJob, js []batch.Job) map[types.UID][]batch.Job {
jobsBySj := make(map[types.UID][]batch.Job)
for _, job := range js {
parentUID, found := getParentUIDFromJob(job)
if !found {
glog.Errorf("Unable to get uid from job %s in namespace %s", job.Name, job.Namespace)
continue
}
jobsBySj[parentUID] = append(jobsBySj[parentUID], job)
}
return jobsBySj
}
// getNextStartTimeAfter gets the latest scheduled start time that is less than "now", or an error.
func getNextStartTimeAfter(schedule string, now time.Time) (time.Time, error) {
// Using robfig/cron for cron scheduled parsing and next runtime
// computation. Not using the entire library because:
// - I want to detect when we missed a runtime due to being down.
// - How do I set the time such that I can detect the last known runtime?
// - I guess the functions could launch a go-routine to start the job and
// then return.
// How to handle concurrency control.
// How to detect changes to schedules or deleted schedules and then
// update the jobs?
sched, err := cron.Parse(schedule)
if err != nil {
return time.Unix(0, 0), fmt.Errorf("Unparseable schedule: %s : %s", schedule, err)
}
return sched.Next(now), nil
}
// getRecentUnmetScheduleTimes gets a slice of times that have passed when a Job should have started but did not
//
// If there are too many unstarted times, only the most recent may be returned.
// If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{}
sched, err := cron.Parse(sj.Spec.Schedule)
if err != nil {
return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
}
var earliestTime time.Time
if sj.Status.LastScheduleTime != nil {
earliestTime = sj.Status.LastScheduleTime.Time
} else {
// If none found, then this is either a recently created scheduledJob,
// or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the
// ScheduledJob as last known start time.
earliestTime = sj.ObjectMeta.CreationTimestamp.Time
}
if earliestTime.After(now) {
return []time.Time{}, nil
}
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
starts = append(starts, t)
// An object might miss several starts. For example, if
// controller gets wedged on friday at 5:01pm when everyone has
// gone home, and someone comes in on tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should
// all start running with no further intervention (if the scheduledJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the misseded start times.
//
// I've somewhat arbitrarily picked 100, as more than 80, but
// but less than "lots".
if len(starts) > 100 {
return []time.Time{}, fmt.Errorf("Too many missed start times to list")
}
}
return starts, nil
}
func isJobActive(j *batch.Job) bool {
return !isJobFinished(j)
}
func isJobFinished(j *batch.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue {
return true
}
}
return false
}
// XXX unit test this
// getJobFromTemplate makes a Job from a ScheduledJob
func getJobFromTemplate(sj *batch.ScheduledJob) (*batch.Job, error) {
// TODO: consider adding the following labels:
// nominal-start-time=$RFC_3339_DATE_OF_INTENDED_START -- for user convenience
// scheduled-job-name=$SJ_NAME -- for user convenience
labels := copyLabels(&sj.Spec.JobTemplate)
annotations := copyAnnotations(&sj.Spec.JobTemplate)
createdByRefJson, err := makeCreatedByRefJson(sj)
if err != nil {
return nil, err
}
annotations[CreatedByAnnotation] = string(createdByRefJson)
// TODO: instead of using generateName, use a deterministic hash of the nominal
// start time, to prevent same job being created twice.
// We want job names for a given nominal start time to have a predictable name to avoid
prefix := fmt.Sprintf("%s-", sj.Name)
job := &batch.Job{
ObjectMeta: api.ObjectMeta{
Labels: labels,
Annotations: annotations,
GenerateName: prefix,
},
}
if err := api.Scheme.Convert(&sj.Spec.JobTemplate.Spec, &job.Spec); err != nil {
return nil, fmt.Errorf("unable to convert job template: %v", err)
}
return job, nil
}
// makeCreatedByRefJson makes a json string with an object reference for use in "created-by" annotation value
func makeCreatedByRefJson(object runtime.Object) (string, error) {
createdByRef, err := api.GetReference(object)
if err != nil {
return "", fmt.Errorf("unable to get controller reference: %v", err)
}
// TODO: this code was not safe previously - as soon as new code came along that switched to v2, old clients
// would be broken upon reading it. This is explicitly hardcoded to v1 to guarantee predictable deployment.
// We need to consistently handle this case of annotation versioning.
codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"})
createdByRefJson, err := runtime.Encode(codec, &api.SerializedReference{
Reference: *createdByRef,
})
if err != nil {
return "", fmt.Errorf("unable to serialize controller reference: %v", err)
}
return string(createdByRefJson), nil
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package util
import (
"fmt"
fed_clientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
@ -82,6 +84,7 @@ func (c *ClientCache) ClientConfigForVersion(version *unversioned.GroupVersion)
return nil, err
}
config.GroupVersion = negotiatedVersion
fmt.Printf(" Negotiated version %v\n", negotiatedVersion)
if version != nil {
c.configs[*version] = &config

View File

@ -352,6 +352,7 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory {
},
ClientForMapping: func(mapping *meta.RESTMapping) (resource.RESTClient, error) {
cfg, err := clientConfig.ClientConfig()
fmt.Printf("Mapping version: %#v", mappingVersion)
if err != nil {
return nil, err
}