From 8675e014fbbee61292ea7e10cd77e8f02d6858d0 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Fri, 20 May 2016 00:53:39 -0700 Subject: [PATCH 1/3] ScheduledJob controller --- .../app/controllermanager.go | 16 ++ .../typed/batch/unversioned/batch_client.go | 9 + pkg/client/restclient/config.go | 4 + pkg/client/unversioned/scheduledjobs.go | 14 +- pkg/controller/scheduledjob/controller.go | 253 ++++++++++++++++++ .../scheduledjob/controller_test.go | 230 ++++++++++++++++ pkg/controller/scheduledjob/injection.go | 138 ++++++++++ pkg/controller/scheduledjob/utils.go | 239 +++++++++++++++++ pkg/kubectl/cmd/util/clientcache.go | 3 + pkg/kubectl/cmd/util/factory.go | 1 + 10 files changed, 900 insertions(+), 7 deletions(-) create mode 100644 pkg/controller/scheduledjob/controller.go create mode 100644 pkg/controller/scheduledjob/controller_test.go create mode 100644 pkg/controller/scheduledjob/injection.go create mode 100644 pkg/controller/scheduledjob/utils.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 9bf1bf1bc3a..300396a7095 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -62,6 +62,7 @@ import ( replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" routecontroller "k8s.io/kubernetes/pkg/controller/route" + "k8s.io/kubernetes/pkg/controller/scheduledjob" servicecontroller "k8s.io/kubernetes/pkg/controller/service" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" @@ -381,6 +382,21 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } } + groupVersion = "batch/v2alpha1" + resources, found = resourceMap[groupVersion] + if containsVersion(versions, groupVersion) && found { + glog.Infof("Starting %s apis", groupVersion) + if containsResource(resources, "scheduledjobs") { + glog.Infof("Starting scheduledjob controller") + go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))). + Run(wait.NeverStop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + } else { + glog.Infof("Not starting %s apis", groupVersion) + } + provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfiguration) if err != nil { glog.Fatalf("A Provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) diff --git a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go index 8f2b0e80386..783e8517139 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go @@ -18,6 +18,7 @@ package unversioned import ( api "k8s.io/kubernetes/pkg/api" + //"k8s.io/kubernetes/pkg/api/unversioned" registered "k8s.io/kubernetes/pkg/apimachinery/registered" restclient "k8s.io/kubernetes/pkg/client/restclient" ) @@ -81,9 +82,17 @@ func setConfigDefaults(config *restclient.Config) error { } // TODO: Unconditionally set the config.Version, until we fix the config. //if config.Version == "" { + // XXX why is above commented out? copyGroupVersion := g.GroupVersion config.GroupVersion = ©GroupVersion //} + // Do we need something like this: + // if config.Version == "" { + // copyGroupVersion := g.GroupVersion + // config.GroupVersion = ©GroupVersion + //} else { + // config.GroupVersion = &unversioned.GroupVersion{Group: "batch", Version: config.Version} + //} config.NegotiatedSerializer = api.Codecs diff --git a/pkg/client/restclient/config.go b/pkg/client/restclient/config.go index aa9cc520268..82c1ac2cf96 100644 --- a/pkg/client/restclient/config.go +++ b/pkg/client/restclient/config.go @@ -108,6 +108,10 @@ type Config struct { // Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst RateLimiter flowcontrol.RateLimiter + + // Version forces a specific version to be used (if registered) + // Do we need this? + // Version string } // TLSClientConfig contains settings to enable transport layer security diff --git a/pkg/client/unversioned/scheduledjobs.go b/pkg/client/unversioned/scheduledjobs.go index de07a7e5de7..5c9222cf028 100644 --- a/pkg/client/unversioned/scheduledjobs.go +++ b/pkg/client/unversioned/scheduledjobs.go @@ -55,34 +55,34 @@ var _ ScheduledJobInterface = &scheduledJobs{} // List returns a list of scheduled jobs that match the label and field selectors. func (c *scheduledJobs) List(opts api.ListOptions) (result *batch.ScheduledJobList, err error) { result = &batch.ScheduledJobList{} - err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result) + err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result) return } // Get returns information about a particular scheduled job. func (c *scheduledJobs) Get(name string) (result *batch.ScheduledJob, err error) { result = &batch.ScheduledJob{} - err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").Name(name).Do().Into(result) + err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").Name(name).Do().Into(result) return } // Create creates a new scheduled job. func (c *scheduledJobs) Create(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) { result = &batch.ScheduledJob{} - err = c.r.Post().Namespace(c.ns).Resource("scheduledJobs").Body(job).Do().Into(result) + err = c.r.Post().Namespace(c.ns).Resource("scheduledjobs").Body(job).Do().Into(result) return } // Update updates an existing scheduled job. func (c *scheduledJobs) Update(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) { result = &batch.ScheduledJob{} - err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).Body(job).Do().Into(result) + err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).Body(job).Do().Into(result) return } // Delete deletes a scheduled job, returns error if one occurs. func (c *scheduledJobs) Delete(name string, options *api.DeleteOptions) (err error) { - return c.r.Delete().Namespace(c.ns).Resource("scheduledJobs").Name(name).Body(options).Do().Error() + return c.r.Delete().Namespace(c.ns).Resource("scheduledjobs").Name(name).Body(options).Do().Error() } // Watch returns a watch.Interface that watches the requested scheduled jobs. @@ -90,7 +90,7 @@ func (c *scheduledJobs) Watch(opts api.ListOptions) (watch.Interface, error) { return c.r.Get(). Prefix("watch"). Namespace(c.ns). - Resource("scheduledJobs"). + Resource("scheduledjobs"). VersionedParams(&opts, api.ParameterCodec). Watch() } @@ -98,6 +98,6 @@ func (c *scheduledJobs) Watch(opts api.ListOptions) (watch.Interface, error) { // UpdateStatus takes the name of the scheduled job and the new status. Returns the server's representation of the scheduled job, and an error, if it occurs. func (c *scheduledJobs) UpdateStatus(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) { result = &batch.ScheduledJob{} - err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result) + err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result) return } diff --git a/pkg/controller/scheduledjob/controller.go b/pkg/controller/scheduledjob/controller.go new file mode 100644 index 00000000000..e62858103f8 --- /dev/null +++ b/pkg/controller/scheduledjob/controller.go @@ -0,0 +1,253 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduledjob + +/* +I did not use watch or expectations. Those add a lot of corner cases, and we aren't +expecting a large volume of jobs or scheduledJobs. (We are favoring correctness +over scalability. If we find a single controller thread is too slow because +there are a lot of Jobs or ScheduledJobs, we we can parallelize by Namespace. +If we find the load on the API server is too high, we can use a watch and +UndeltaStore.) + +Just periodically list jobs and SJs, and then reconcile them. + +*/ + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/metrics" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" +) + +// Utilities for dealing with Jobs and ScheduledJobs and time. + +type ScheduledJobController struct { + kubeClient clientset.Interface + jobControl jobControlInterface + sjControl sjControlInterface + recorder record.EventRecorder +} + +func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + // TODO: remove the wrapper when every clients have moved to use the clientset. + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + + if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter()) + } + + jm := &ScheduledJobController{ + kubeClient: kubeClient, + jobControl: realJobControl{KubeClient: kubeClient}, + sjControl: &realSJControl{KubeClient: kubeClient}, + recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}), + } + + return jm +} + +func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController { + jm := NewScheduledJobController(kubeClient) + return jm +} + +// Run the main goroutine responsible for watching and syncing jobs. +func (jm *ScheduledJobController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + glog.Infof("Starting ScheduledJob Manager") + // Check things every 1 second. + go wait.Until(jm.SyncAll, 1*time.Second, stopCh) + <-stopCh + glog.Infof("Shutting down ScheduledJob Manager") +} + +// SyncAll lists all the ScheduledJobs and Jobs and reconciles them. +func (jm *ScheduledJobController) SyncAll() { + sjl, err := jm.kubeClient.Batch().ScheduledJobs(api.NamespaceAll).List(api.ListOptions{}) + if err != nil { + glog.Errorf("Error listing scheduledjobs: %v", err) + return + } + sjs := sjl.Items + glog.Info("Found %d scheduledjobs", len(sjs)) + + jl, err := jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(api.ListOptions{}) + if err != nil { + glog.Errorf("Error listing jobs") + return + } + js := jl.Items + glog.Info("Found %d jobs", len(js)) + + jobsBySj := groupJobsByParent(sjs, js) + glog.Info("Found %d groups", len(jobsBySj)) + + for _, sj := range sjs { + SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) + } +} + +// SyncOne reconciles a ScheduledJob with a list of any Jobs that it created. +// All known jobs created by "sj" should be included in "js". +// Returns a new ScheduledJobStatus if an update to status is required, else nil. +// The current time is passed in to facilitate testing. +// It has no receiver, to facilitate testing. +func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) { + nameForLog := fmt.Sprintf("namespace/%s/scheduledJob/%s", sj.Namespace, sj.Name) + + glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) + for _, j := range js { + found := inActiveList(sj, j.ObjectMeta.UID) + if !found { + recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name) + // We found a job object that has us as the parent, but it is not in our Active list. + // This could happen if we crashed right after creating the Job and before updating the status, + // or if our jobs list is newer than our sj status after a relist, or if someone intentionally created + // a job that they wanted us to adopt. + + // TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't + // stop users from creating jobs if they have permission. It is assumed that if a + // user has permission to create a job within a namespace, then they have permission to make any scheduledJob + // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. + // TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about? + } else { + if isJobActive(&j) { + deleteFromActiveList(&sj, j.ObjectMeta.UID) + // TODO: event to call out failure vs success. + recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) + } + } + } + err := sjc.UpdateStatus(&sj) + if err != nil { + glog.Errorf("Unable to update status for %s: %v", nameForLog, err) + } + + if sj.Spec.Suspend { + glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) + return + } + times, err := getRecentUnmetScheduleTimes(sj, now) + if err != nil { + glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) + } + // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed. + if len(times) == 0 { + glog.V(4).Infof("No unmet start times for %s", nameForLog) + return + } + if len(times) > 1 { + glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog) + } + scheduledTime := times[len(times)-1] + tooLate := false + if sj.Spec.StartingDeadlineSeconds != nil { + tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) + } + if tooLate { + glog.Errorf("Missed starting window for %s", nameForLog) + // TODO: generate an event for a miss. Use a warning level event because it indicates a + // problem with the controller (restart or long queue), and is not expected by user either. + // Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing + // the miss every cycle. In order to avoid sending multiple events, and to avoid processing + // the sj again and again, we could set a Status.LastMissedTime when we notice a miss. + // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp, + // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate + // and event the next time we process it, and also so the user looking at the status + // can see easily that there was a missed execution. + return + } + if sj.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(sj.Status.Active) > 0 { + // Regardless which source of information we use for the set of active jobs, + // there is some risk that we won't see an active job when there is one. + // (because we haven't seen the status update to the SJ or the created pod). + // So it is theoretically possible to have concurrency with Forbid. + // As long the as the invokations are "far enough apart in time", this usually won't happen. + // + // TODO: for Forbid, we could use the same name for every execution, as a lock. + // With replace, we could use a name that is deterministic per execution time. + // But that would mean that you could not inspect prior successes or failures of Forbid jobs. + glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog) + return + } + if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { + glog.Errorf("Not starting job for %s because of prior execution still running and concurrency policy is Replace and delete is not supported yet", nameForLog) + for _, j := range sj.Status.Active { + glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog) + if err := jc.DeleteJob(j.Namespace, j.Name); err != nil { + recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) + return + } + recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name) + } + } + + jobReq, err := getJobFromTemplate(&sj) + if err != nil { + glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) + return + } + jobResp, err := jc.CreateJob(sj.Namespace, jobReq) + if err != nil { + recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) + return + } + recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) + + // ------------------------------------------------------------------ // + + // If this process restarts at this point (after posting a job, but + // before updating the status), then we might try to start the job on + // the next time. Actually, if we relist the SJs and Jobs on the next + // iteration of SyncAll, we might not see our own status update, and + // then post one again. So, we need to use the job name as a lock to + // prevent us from making the job twice. TODO: name the job + // deterministically. + + // Add the just-started job to the status list. + ref, err := getRef(jobResp) + if err != nil { + glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) + } else { + sj.Status.Active = append(sj.Status.Active, *ref) + } + sj.Status.LastScheduleTime = &unversioned.Time{scheduledTime} + if err := sjc.UpdateStatus(&sj); err != nil { + glog.Infof("Unable to update status for %s: %v", nameForLog, err) + } + + return +} + +func getRef(object runtime.Object) (*api.ObjectReference, error) { + return api.GetReference(object) +} diff --git a/pkg/controller/scheduledjob/controller_test.go b/pkg/controller/scheduledjob/controller_test.go new file mode 100644 index 00000000000..32df1cee4dd --- /dev/null +++ b/pkg/controller/scheduledjob/controller_test.go @@ -0,0 +1,230 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduledjob + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/types" + "testing" + "time" +) + +// schedule is hourly on the hour +var ( + onTheHour string = "0 0 * * * ?" +) + +func justBeforeTheHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func topOfTheHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func justAfterTheHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:01:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func justBeforeThePriorHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T08:59:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +func justAfterThePriorHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T09:01:00Z") + if err != nil { + panic("test setup error") + } + return T1 +} + +// returns a scheduledJob with some fields filled in. +func scheduledJob() batch.ScheduledJob { + return batch.ScheduledJob{ + ObjectMeta: api.ObjectMeta{ + Name: "myscheduledjob", + Namespace: "snazzycats", + UID: types.UID("1a2b3c"), + SelfLink: "/apis/extensions/v1beta1/namespaces/snazzycats/jobs/myscheduledjob", + CreationTimestamp: unversioned.Time{justBeforeTheHour()}, + }, + Spec: batch.ScheduledJobSpec{ + Schedule: "0 0 * * * * ?", + ConcurrencyPolicy: batch.AllowConcurrent, + JobTemplate: batch.JobTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: batch.JobSpec{ + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + }, + }, + } +} + +var ( + shortDead int64 = 10 + longDead int64 = 1000000 + noDead int64 = -12345 + A batch.ConcurrencyPolicy = batch.AllowConcurrent + f batch.ConcurrencyPolicy = batch.ForbidConcurrent + R batch.ConcurrencyPolicy = batch.ReplaceConcurrent + T bool = true + F bool = false +) + +func TestSyncOne_RunOrNot(t *testing.T) { + + testCases := map[string]struct { + // sj spec + concurrencyPolicy batch.ConcurrencyPolicy + suspend bool + schedule string + deadline int64 + + // sj status + ranPreviously bool + stillActive bool + + // environment + now time.Time + + // expectations + expectCreate bool + expectDelete bool + }{ + "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, + "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, + "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, + "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, + "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, + "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, + "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F}, + "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F}, + "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), T, F}, + + "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, + "prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, + "prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, + "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, + "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, + "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, + "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F}, + "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F}, + "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F}, + + "still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, + "still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, + "still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, + "still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F}, + "still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F}, + "still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T}, + "still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F}, + "still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F}, + "still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F}, + } + for name, tc := range testCases { + t.Log("Test case:", name) + sj := scheduledJob() + sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy + sj.Spec.Suspend = tc.suspend + sj.Spec.Schedule = tc.schedule + if tc.deadline != noDead { + sj.Spec.StartingDeadlineSeconds = &tc.deadline + } + + if tc.ranPreviously { + sj.ObjectMeta.CreationTimestamp = unversioned.Time{justBeforeThePriorHour()} + sj.Status.LastScheduleTime = &unversioned.Time{justAfterThePriorHour()} + if tc.stillActive { + sj.Status.Active = []api.ObjectReference{{}} + } + } else { + sj.ObjectMeta.CreationTimestamp = unversioned.Time{justBeforeTheHour()} + if tc.stillActive { + t.Errorf("Test setup error: this case makes no sense.") + } + } + + jc := &fakeJobControl{} + sjc := &fakeSJControl{} + recorder := record.NewFakeRecorder(10) + + SyncOne(sj, []batch.Job{}, tc.now, jc, sjc, recorder) + expectedCreates := 0 + if tc.expectCreate { + expectedCreates = 1 + } + if len(jc.Jobs) != expectedCreates { + t.Errorf("Expected %d job started, actually %v", expectedCreates, len(jc.Jobs)) + } + + expectedDeletes := 0 + if tc.expectDelete { + expectedDeletes = 1 + } + if len(jc.DeleteJobName) != expectedDeletes { + t.Errorf("Expected %d job deleted, actually %v", expectedDeletes, len(jc.DeleteJobName)) + } + + expectedEvents := 0 + if tc.expectCreate { + expectedEvents += 1 + } + if tc.expectDelete { + expectedEvents += 1 + } + if len(recorder.Events) != expectedEvents { + t.Errorf("Expected %d event, actually %v", expectedEvents, len(recorder.Events)) + } + } +} + +// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs, +// but over time, all jobs run as expected (assuming Allow and no deadline). diff --git a/pkg/controller/scheduledjob/injection.go b/pkg/controller/scheduledjob/injection.go new file mode 100644 index 00000000000..5b81edc049c --- /dev/null +++ b/pkg/controller/scheduledjob/injection.go @@ -0,0 +1,138 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduledjob + +import ( + "sync" + + "k8s.io/kubernetes/pkg/apis/batch" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/labels" +) + +// sjControlInterface is an interface that knows how to update ScheduledJob status +// created as an interface to allow testing. +type sjControlInterface interface { + UpdateStatus(sj *batch.ScheduledJob) error +} + +// realSJControl is the default implementation of sjControlInterface. +type realSJControl struct { + KubeClient clientset.Interface +} + +var _ sjControlInterface = &realSJControl{} + +func (c *realSJControl) UpdateStatus(sj *batch.ScheduledJob) error { + _, err := c.KubeClient.Batch().ScheduledJobs(sj.Namespace).UpdateStatus(sj) + return err +} + +// fakeSJControl is the default implementation of sjControlInterface. +type fakeSJControl struct { + Updates []batch.ScheduledJob +} + +var _ sjControlInterface = &fakeSJControl{} + +func (c *fakeSJControl) UpdateStatus(sj *batch.ScheduledJob) error { + c.Updates = append(c.Updates, *sj) + return nil +} + +// ------------------------------------------------------------------ // + +// jobControlInterface is an interface that knows how to add or delete jobs +// created as an interface to allow testing. +type jobControlInterface interface { + // CreateJob creates new jobs according to the spec + CreateJob(namespace string, job *batch.Job) (*batch.Job, error) + // DeleteJob deletes the job identified by name. + // TODO: delete by UID? + DeleteJob(namespace string, name string) error +} + +// realJobControl is the default implementation of jobControlInterface. +type realJobControl struct { + KubeClient clientset.Interface + Recorder record.EventRecorder +} + +var _ jobControlInterface = &realJobControl{} + +func copyLabels(template *batch.JobTemplateSpec) labels.Set { + l := make(labels.Set) + for k, v := range template.Labels { + l[k] = v + } + return l +} + +func copyAnnotations(template *batch.JobTemplateSpec) labels.Set { + a := make(labels.Set) + for k, v := range template.Annotations { + a[k] = v + } + return a +} + +func (r realJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) { + return r.KubeClient.Batch().Jobs(namespace).Create(job) +} + +func (r realJobControl) DeleteJob(namespace string, name string) error { + return r.KubeClient.Batch().Jobs(namespace).Delete(name, nil) +} + +type fakeJobControl struct { + sync.Mutex + Jobs []batch.Job + DeleteJobName []string + Err error +} + +var _ jobControlInterface = &fakeJobControl{} + +func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) { + f.Lock() + defer f.Unlock() + if f.Err != nil { + return nil, f.Err + } + f.Jobs = append(f.Jobs, *job) + job.UID = "test-uid" + return job, nil +} + +func (f *fakeJobControl) DeleteJob(namespace string, name string) error { + f.Lock() + defer f.Unlock() + if f.Err != nil { + return f.Err + } + f.DeleteJobName = append(f.DeleteJobName, name) + return nil +} + +func (f *fakeJobControl) Clear() { + f.Lock() + defer f.Unlock() + f.DeleteJobName = []string{} + f.Jobs = []batch.Job{} + f.Err = nil +} diff --git a/pkg/controller/scheduledjob/utils.go b/pkg/controller/scheduledjob/utils.go new file mode 100644 index 00000000000..0c4213c27aa --- /dev/null +++ b/pkg/controller/scheduledjob/utils.go @@ -0,0 +1,239 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduledjob + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/golang/glog" + "github.com/robfig/cron" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" +) + +// Utilities for dealing with Jobs and ScheduledJobs and time. + +const ( + CreatedByAnnotation = "kubernetes.io/created-by" +) + +func inActiveList(sj batch.ScheduledJob, uid types.UID) bool { + for _, j := range sj.Status.Active { + if j.UID == uid { + return true + } + } + return false +} + +func deleteFromActiveList(sj *batch.ScheduledJob, uid types.UID) { + if sj == nil { + return + } + newActive := []api.ObjectReference{} + for _, j := range sj.Status.Active { + if j.UID != uid { + newActive = append(newActive, j) + } + } + sj.Status.Active = newActive +} + +// getParentUIDFromJob extracts UID of job's parent and whether it was found +func getParentUIDFromJob(j batch.Job) (types.UID, bool) { + creatorRefJson, found := j.ObjectMeta.Annotations[CreatedByAnnotation] + if !found { + glog.V(4).Infof("Job with no created-by annotation, name %s namespace %s", j.Name, j.Namespace) + return types.UID(""), false + } + var sr api.SerializedReference + err := json.Unmarshal([]byte(creatorRefJson), &sr) + if err != nil { + glog.V(4).Infof("Job with unparsable created-by annotation, name %s namespace %s: %v", j.Name, j.Namespace, err) + return types.UID(""), false + } + if sr.Reference.Kind != "ScheduledJob" { + glog.V(4).Infof("Job with non-ScheduledJob parent, name %s namespace %s", j.Name, j.Namespace) + return types.UID(""), false + } + // Don't believe a job that claims to have a parent in a different namespace. + if sr.Reference.Namespace != j.Namespace { + glog.V(4).Infof("Alleged scheduledJob parent in different namespace (%s) from Job name %s namespace %s", sr.Reference.Namespace, j.Name, j.Namespace) + return types.UID(""), false + } + + return sr.Reference.UID, true +} + +// groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. scheduledJob). +// It has no receiver, to facilitate testing. +func groupJobsByParent(sjs []batch.ScheduledJob, js []batch.Job) map[types.UID][]batch.Job { + jobsBySj := make(map[types.UID][]batch.Job) + for _, job := range js { + parentUID, found := getParentUIDFromJob(job) + if !found { + glog.Errorf("Unable to get uid from job %s in namespace %s", job.Name, job.Namespace) + continue + } + jobsBySj[parentUID] = append(jobsBySj[parentUID], job) + } + return jobsBySj +} + +// getNextStartTimeAfter gets the latest scheduled start time that is less than "now", or an error. +func getNextStartTimeAfter(schedule string, now time.Time) (time.Time, error) { + // Using robfig/cron for cron scheduled parsing and next runtime + // computation. Not using the entire library because: + // - I want to detect when we missed a runtime due to being down. + // - How do I set the time such that I can detect the last known runtime? + // - I guess the functions could launch a go-routine to start the job and + // then return. + // How to handle concurrency control. + // How to detect changes to schedules or deleted schedules and then + // update the jobs? + + sched, err := cron.Parse(schedule) + if err != nil { + return time.Unix(0, 0), fmt.Errorf("Unparseable schedule: %s : %s", schedule, err) + } + return sched.Next(now), nil +} + +// getRecentUnmetScheduleTimes gets a slice of times that have passed when a Job should have started but did not +// +// If there are too many unstarted times, only the most recent may be returned. +// If there were missed times prior to the last known start time, then those are not returned. +func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.Time, error) { + starts := []time.Time{} + sched, err := cron.Parse(sj.Spec.Schedule) + if err != nil { + return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err) + } + var earliestTime time.Time + if sj.Status.LastScheduleTime != nil { + earliestTime = sj.Status.LastScheduleTime.Time + } else { + // If none found, then this is either a recently created scheduledJob, + // or the active/completed info was somehow lost (contract for status + // in kubernetes says it may need to be recreated), or that we have + // started a job, but have not noticed it yet (distributed systems can + // have arbitrary delays). In any case, use the creation time of the + // ScheduledJob as last known start time. + earliestTime = sj.ObjectMeta.CreationTimestamp.Time + } + + if earliestTime.After(now) { + return []time.Time{}, nil + } + + for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) { + starts = append(starts, t) + // An object might miss several starts. For example, if + // controller gets wedged on friday at 5:01pm when everyone has + // gone home, and someone comes in on tuesday AM and discovers + // the problem and restarts the controller, then all the hourly + // jobs, more than 80 of them for one hourly scheduledJob, should + // all start running with no further intervention (if the scheduledJob + // allows concurrency and late starts). + // + // However, if there is a bug somewhere, or incorrect clock + // on controller's server or apiservers (for setting creationTimestamp) + // then there could be so many missed start times (it could be off + // by decades or more), that it would eat up all the CPU and memory + // of this controller. In that case, we want to not try to list + // all the misseded start times. + // + // I've somewhat arbitrarily picked 100, as more than 80, but + // but less than "lots". + if len(starts) > 100 { + return []time.Time{}, fmt.Errorf("Too many missed start times to list") + } + } + return starts, nil +} + +func isJobActive(j *batch.Job) bool { + return !isJobFinished(j) +} + +func isJobFinished(j *batch.Job) bool { + for _, c := range j.Status.Conditions { + if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue { + return true + } + } + return false +} + +// XXX unit test this + +// getJobFromTemplate makes a Job from a ScheduledJob +func getJobFromTemplate(sj *batch.ScheduledJob) (*batch.Job, error) { + // TODO: consider adding the following labels: + // nominal-start-time=$RFC_3339_DATE_OF_INTENDED_START -- for user convenience + // scheduled-job-name=$SJ_NAME -- for user convenience + labels := copyLabels(&sj.Spec.JobTemplate) + annotations := copyAnnotations(&sj.Spec.JobTemplate) + createdByRefJson, err := makeCreatedByRefJson(sj) + if err != nil { + return nil, err + } + annotations[CreatedByAnnotation] = string(createdByRefJson) + // TODO: instead of using generateName, use a deterministic hash of the nominal + // start time, to prevent same job being created twice. + // We want job names for a given nominal start time to have a predictable name to avoid + prefix := fmt.Sprintf("%s-", sj.Name) + + job := &batch.Job{ + ObjectMeta: api.ObjectMeta{ + Labels: labels, + Annotations: annotations, + GenerateName: prefix, + }, + } + if err := api.Scheme.Convert(&sj.Spec.JobTemplate.Spec, &job.Spec); err != nil { + return nil, fmt.Errorf("unable to convert job template: %v", err) + } + return job, nil +} + +// makeCreatedByRefJson makes a json string with an object reference for use in "created-by" annotation value +func makeCreatedByRefJson(object runtime.Object) (string, error) { + createdByRef, err := api.GetReference(object) + if err != nil { + return "", fmt.Errorf("unable to get controller reference: %v", err) + } + + // TODO: this code was not safe previously - as soon as new code came along that switched to v2, old clients + // would be broken upon reading it. This is explicitly hardcoded to v1 to guarantee predictable deployment. + // We need to consistently handle this case of annotation versioning. + codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"}) + + createdByRefJson, err := runtime.Encode(codec, &api.SerializedReference{ + Reference: *createdByRef, + }) + if err != nil { + return "", fmt.Errorf("unable to serialize controller reference: %v", err) + } + return string(createdByRefJson), nil +} diff --git a/pkg/kubectl/cmd/util/clientcache.go b/pkg/kubectl/cmd/util/clientcache.go index 81476cabaf0..1dc50ad8099 100644 --- a/pkg/kubectl/cmd/util/clientcache.go +++ b/pkg/kubectl/cmd/util/clientcache.go @@ -17,6 +17,8 @@ limitations under the License. package util import ( + "fmt" + fed_clientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" @@ -82,6 +84,7 @@ func (c *ClientCache) ClientConfigForVersion(version *unversioned.GroupVersion) return nil, err } config.GroupVersion = negotiatedVersion + fmt.Printf(" Negotiated version %v\n", negotiatedVersion) if version != nil { c.configs[*version] = &config diff --git a/pkg/kubectl/cmd/util/factory.go b/pkg/kubectl/cmd/util/factory.go index a961805cd84..251c8a78f5c 100644 --- a/pkg/kubectl/cmd/util/factory.go +++ b/pkg/kubectl/cmd/util/factory.go @@ -352,6 +352,7 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { }, ClientForMapping: func(mapping *meta.RESTMapping) (resource.RESTClient, error) { cfg, err := clientConfig.ClientConfig() + fmt.Printf("Mapping version: %#v", mappingVersion) if err != nil { return nil, err } From 8b2e2486413f7bea2685483bb65b789a7b7b8cc3 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Tue, 24 May 2016 13:52:19 -0700 Subject: [PATCH 2/3] Add test and doc.go --- pkg/controller/scheduledjob/doc.go | 18 + pkg/controller/scheduledjob/utils_test.go | 404 ++++++++++++++++++++++ 2 files changed, 422 insertions(+) create mode 100644 pkg/controller/scheduledjob/doc.go create mode 100644 pkg/controller/scheduledjob/utils_test.go diff --git a/pkg/controller/scheduledjob/doc.go b/pkg/controller/scheduledjob/doc.go new file mode 100644 index 00000000000..3fafb54d088 --- /dev/null +++ b/pkg/controller/scheduledjob/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package scheduledjob contains the controller for ScheduledJob objects. +package scheduledjob diff --git a/pkg/controller/scheduledjob/utils_test.go b/pkg/controller/scheduledjob/utils_test.go new file mode 100644 index 00000000000..78bfd02a4bf --- /dev/null +++ b/pkg/controller/scheduledjob/utils_test.go @@ -0,0 +1,404 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduledjob + +import ( + //"fmt" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/types" + //"k8s.io/kubernetes/pkg/controller" + // "k8s.io/kubernetes/pkg/util/rand" +) + +func TestGetJobFromTemplate(t *testing.T) { + // getJobFromTemplate() needs to take the job template and copy the labels and annotations + // and other fields, and add a created-by reference. + + var one int64 = 1 + var no bool = false + + sj := batch.ScheduledJob{ + ObjectMeta: api.ObjectMeta{ + Name: "myscheduledjob", + Namespace: "snazzycats", + UID: types.UID("1a2b3c"), + SelfLink: "/apis/extensions/v1beta1/namespaces/snazzycats/jobs/myscheduledjob", + }, + Spec: batch.ScheduledJobSpec{ + Schedule: "0 0 * * * * ?", + ConcurrencyPolicy: batch.AllowConcurrent, + JobTemplate: batch.JobTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: batch.JobSpec{ + ActiveDeadlineSeconds: &one, + ManualSelector: &no, + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + }, + }, + } + + var job *batch.Job + job, err := getJobFromTemplate(&sj) + if err != nil { + t.Errorf("Did not expect error: %s", err) + } + if job.ObjectMeta.GenerateName != "myscheduledjob-" { + t.Errorf("Wrong GenerateName") + } + if len(job.ObjectMeta.Labels) != 1 { + t.Errorf("Wrong number of labels") + } + if len(job.ObjectMeta.Annotations) != 2 { + t.Errorf("Wrong number of annotations") + } + v, ok := job.ObjectMeta.Annotations["kubernetes.io/created-by"] + if !ok { + t.Errorf("Missing created-by annotation") + } + expectedCreatedBy := `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"snazzycats","name":"myscheduledjob","uid":"1a2b3c","apiVersion":"extensions"}} +` + if len(v) != len(expectedCreatedBy) { + t.Errorf("Wrong length for created-by annotation, expected %v got %v", len(expectedCreatedBy), len(v)) + } + if v != expectedCreatedBy { + t.Errorf("Wrong value for created-by annotation, expected %v got %v", expectedCreatedBy, v) + } +} + +func TestGetParentUIDFromJob(t *testing.T) { + j := &batch.Job{ + ObjectMeta: api.ObjectMeta{ + Name: "foobar", + Namespace: api.NamespaceDefault, + }, + Spec: batch.JobSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{{ + Type: batch.JobComplete, + Status: api.ConditionTrue, + }}, + }, + } + { + // Case 1: No UID annotation + _, found := getParentUIDFromJob(*j) + + if found { + t.Errorf("Unexpectedly found uid") + } + } + { + // Case 2: Has UID annotation + j.ObjectMeta.Annotations = map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"default","name":"pi","uid":"5ef034e0-1890-11e6-8935-42010af0003e","apiVersion":"extensions","resourceVersion":"427339"}}`} + + expectedUID := types.UID("5ef034e0-1890-11e6-8935-42010af0003e") + + uid, found := getParentUIDFromJob(*j) + if !found { + t.Errorf("Unexpectedly did not find uid") + } else if uid != expectedUID { + t.Errorf("Wrong UID: %v", uid) + } + } + +} + +func TestGroupJobsByParent(t *testing.T) { + uid1 := types.UID("11111111-1111-1111-1111-111111111111") + uid2 := types.UID("22222222-2222-2222-2222-222222222222") + uid3 := types.UID("33333333-3333-3333-3333-333333333333") + createdBy1 := map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"x","name":"pi","uid":"11111111-1111-1111-1111-111111111111","apiVersion":"extensions","resourceVersion":"111111"}}`} + createdBy2 := map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"x","name":"pi","uid":"22222222-2222-2222-2222-222222222222","apiVersion":"extensions","resourceVersion":"222222"}}`} + createdBy3 := map[string]string{"kubernetes.io/created-by": `{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ScheduledJob","namespace":"y","name":"pi","uid":"33333333-3333-3333-3333-333333333333","apiVersion":"extensions","resourceVersion":"333333"}}`} + noCreatedBy := map[string]string{} + + { + // Case 1: There are no jobs and scheduledJobs + sjs := []batch.ScheduledJob{} + js := []batch.Job{} + jobsBySj := groupJobsByParent(sjs, js) + if len(jobsBySj) != 0 { + t.Errorf("Wrong number of items in map") + } + } + + { + // Case 2: there is one controller with no job. + sjs := []batch.ScheduledJob{ + {ObjectMeta: api.ObjectMeta{Name: "e", Namespace: "x", UID: uid1}}, + } + js := []batch.Job{} + jobsBySj := groupJobsByParent(sjs, js) + if len(jobsBySj) != 0 { + t.Errorf("Wrong number of items in map") + } + } + + { + // Case 3: there is one controller with one job it created. + sjs := []batch.ScheduledJob{ + {ObjectMeta: api.ObjectMeta{Name: "e", Namespace: "x", UID: uid1}}, + } + js := []batch.Job{ + {ObjectMeta: api.ObjectMeta{Name: "a", Namespace: "x", Annotations: createdBy1}}, + } + jobsBySj := groupJobsByParent(sjs, js) + + if len(jobsBySj) != 1 { + t.Errorf("Wrong number of items in map") + } + jobList1, found := jobsBySj[uid1] + if !found { + t.Errorf("Key not found") + } + if len(jobList1) != 1 { + t.Errorf("Wrong number of items in map") + } + } + + { + // Case 4: Two namespaces, one has two jobs from one controller, other has 3 jobs from two controllers. + // There are also two jobs with no created-by annotation. + js := []batch.Job{ + {ObjectMeta: api.ObjectMeta{Name: "a", Namespace: "x", Annotations: createdBy1}}, + {ObjectMeta: api.ObjectMeta{Name: "b", Namespace: "x", Annotations: createdBy2}}, + {ObjectMeta: api.ObjectMeta{Name: "c", Namespace: "x", Annotations: createdBy1}}, + {ObjectMeta: api.ObjectMeta{Name: "d", Namespace: "x", Annotations: noCreatedBy}}, + {ObjectMeta: api.ObjectMeta{Name: "a", Namespace: "y", Annotations: createdBy3}}, + {ObjectMeta: api.ObjectMeta{Name: "b", Namespace: "y", Annotations: createdBy3}}, + {ObjectMeta: api.ObjectMeta{Name: "d", Namespace: "y", Annotations: noCreatedBy}}, + } + sjs := []batch.ScheduledJob{ + {ObjectMeta: api.ObjectMeta{Name: "e", Namespace: "x", UID: uid1}}, + {ObjectMeta: api.ObjectMeta{Name: "f", Namespace: "x", UID: uid2}}, + {ObjectMeta: api.ObjectMeta{Name: "g", Namespace: "y", UID: uid3}}, + } + + jobsBySj := groupJobsByParent(sjs, js) + + if len(jobsBySj) != 3 { + t.Errorf("Wrong number of items in map") + } + jobList1, found := jobsBySj[uid1] + if !found { + t.Errorf("Key not found") + } + if len(jobList1) != 2 { + t.Errorf("Wrong number of items in map") + } + jobList2, found := jobsBySj[uid2] + if !found { + t.Errorf("Key not found") + } + if len(jobList2) != 1 { + t.Errorf("Wrong number of items in map") + } + jobList3, found := jobsBySj[uid3] + if !found { + t.Errorf("Key not found") + } + if len(jobList3) != 2 { + t.Errorf("Wrong number of items in map") + } + } + +} + +func TestGetRecentUnmetScheduleTimes(t *testing.T) { + // schedule is hourly on the hour + schedule := "0 0 * * * ?" + // T1 is a scheduled start time of that schedule + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") + if err != nil { + t.Errorf("test setup error: %v", err) + } + // T2 is a scheduled start time of that schedule after T1 + T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z") + if err != nil { + t.Errorf("test setup error: %v", err) + } + + sj := batch.ScheduledJob{ + ObjectMeta: api.ObjectMeta{ + Name: "myscheduledjob", + Namespace: api.NamespaceDefault, + UID: types.UID("1a2b3c"), + }, + Spec: batch.ScheduledJobSpec{ + Schedule: schedule, + ConcurrencyPolicy: batch.AllowConcurrent, + JobTemplate: batch.JobTemplateSpec{}, + }, + } + { + // Case 1: no known start times, and none needed yet. + // Creation time is before T1. + sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + // Current time is more than creation time, but less than T1. + now := T1.Add(-7 * time.Minute) + times, err := getRecentUnmetScheduleTimes(sj, now) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 0 { + t.Errorf("expected no start times, got: %v", times) + } + } + { + // Case 2: no known start times, and one needed. + // Creation time is before T1. + sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + // Current time is after T1 + now := T1.Add(2 * time.Second) + times, err := getRecentUnmetScheduleTimes(sj, now) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 1 { + t.Errorf("expected 1 start time, got: %v", times) + } else if !times[0].Equal(T1) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + } + { + // Case 3: known LastScheduleTime, no start needed. + // Creation time is before T1. + sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + // Status shows a start at the expected time. + sj.Status.LastScheduleTime = &unversioned.Time{T1} + // Current time is after T1 + now := T1.Add(2 * time.Minute) + times, err := getRecentUnmetScheduleTimes(sj, now) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 0 { + t.Errorf("expected 0 start times, got: , got: %v", times) + } + } + { + // Case 4: known LastScheduleTime, a start needed + // Creation time is before T1. + sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + // Status shows a start at the expected time. + sj.Status.LastScheduleTime = &unversioned.Time{T1} + // Current time is after T1 and after T2 + now := T2.Add(5 * time.Minute) + times, err := getRecentUnmetScheduleTimes(sj, now) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 1 { + t.Errorf("expected 2 start times, got: , got: %v", times) + } else if !times[0].Equal(T2) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + } + { + // Case 5: known LastScheduleTime, two starts needed + sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-2 * time.Hour)} + sj.Status.LastScheduleTime = &unversioned.Time{T1.Add(-1 * time.Hour)} + // Current time is after T1 and after T2 + now := T2.Add(5 * time.Minute) + times, err := getRecentUnmetScheduleTimes(sj, now) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 2 { + t.Errorf("expected 2 start times, got: , got: %v", times) + } else { + if !times[0].Equal(T1) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + if !times[1].Equal(T2) { + t.Errorf("expected: %v, got: %v", T2, times[1]) + } + } + } + { + // Case 6: now is way way ahead of last start time. + sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-2 * time.Hour)} + sj.Status.LastScheduleTime = &unversioned.Time{T1.Add(-1 * time.Hour)} + now := T2.Add(10 * 24 * time.Hour) + _, err := getRecentUnmetScheduleTimes(sj, now) + if err == nil { + t.Errorf("unexpected lack of error") + } + } + +} + +func TestIsJobFinished(t *testing.T) { + job := &batch.Job{ + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{{ + Type: batch.JobComplete, + Status: api.ConditionTrue, + }}, + }, + } + + if !isJobFinished(job) { + t.Error("Job was expected to be finished") + } + + job.Status.Conditions[0].Status = api.ConditionFalse + if isJobFinished(job) { + t.Error("Job was not expected to be finished") + } + + job.Status.Conditions[0].Status = api.ConditionUnknown + if isJobFinished(job) { + t.Error("Job was not expected to be finished") + } +} From da57c93a8a68f993237466c2004ad810de47f47d Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Mon, 18 Jul 2016 13:24:57 -0700 Subject: [PATCH 3/3] Fix errors, verification and test failures; add unit test for sj UpdateStatus --- .../typed/batch/unversioned/batch_client.go | 9 - pkg/controller/job/jobcontroller.go | 13 +- pkg/controller/job/jobcontroller_test.go | 25 --- pkg/controller/job/utils.go | 31 +++ pkg/controller/job/utils_test.go | 49 +++++ pkg/controller/scheduledjob/controller.go | 20 +- .../scheduledjob/controller_test.go | 208 ++++++++++++++++-- pkg/controller/scheduledjob/doc.go | 2 +- pkg/controller/scheduledjob/injection.go | 2 +- pkg/controller/scheduledjob/utils.go | 20 +- pkg/controller/scheduledjob/utils_test.go | 47 +--- pkg/kubectl/cmd/util/clientcache.go | 3 - pkg/kubectl/cmd/util/factory.go | 1 - 13 files changed, 294 insertions(+), 136 deletions(-) create mode 100644 pkg/controller/job/utils.go create mode 100644 pkg/controller/job/utils_test.go diff --git a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go index 783e8517139..8f2b0e80386 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned/batch_client.go @@ -18,7 +18,6 @@ package unversioned import ( api "k8s.io/kubernetes/pkg/api" - //"k8s.io/kubernetes/pkg/api/unversioned" registered "k8s.io/kubernetes/pkg/apimachinery/registered" restclient "k8s.io/kubernetes/pkg/client/restclient" ) @@ -82,17 +81,9 @@ func setConfigDefaults(config *restclient.Config) error { } // TODO: Unconditionally set the config.Version, until we fix the config. //if config.Version == "" { - // XXX why is above commented out? copyGroupVersion := g.GroupVersion config.GroupVersion = ©GroupVersion //} - // Do we need something like this: - // if config.Version == "" { - // copyGroupVersion := g.GroupVersion - // config.GroupVersion = ©GroupVersion - //} else { - // config.GroupVersion = &unversioned.GroupVersion{Group: "batch", Version: config.Version} - //} config.NegotiatedSerializer = api.Codecs diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index b4639dec589..6e6b1b3691f 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -113,7 +113,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie framework.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, UpdateFunc: func(old, cur interface{}) { - if job := cur.(*batch.Job); !isJobFinished(job) { + if job := cur.(*batch.Job); !IsJobFinished(job) { jm.enqueueController(job) } }, @@ -347,7 +347,7 @@ func (jm *JobController) syncJob(key string) error { job.Status.StartTime = &now } // if job was finished previously, we don't want to redo the termination - if isJobFinished(&job) { + if IsJobFinished(&job) { return nil } if pastActiveDeadline(&job) { @@ -559,15 +559,6 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int { return result } -func isJobFinished(j *batch.Job) bool { - for _, c := range j.Status.Conditions { - if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue { - return true - } - } - return false -} - // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. type byCreationTimestamp []batch.Job diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 4468d544051..b2a57d57601 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -657,31 +657,6 @@ func TestWatchJobs(t *testing.T) { <-received } -func TestIsJobFinished(t *testing.T) { - job := &batch.Job{ - Status: batch.JobStatus{ - Conditions: []batch.JobCondition{{ - Type: batch.JobComplete, - Status: api.ConditionTrue, - }}, - }, - } - - if !isJobFinished(job) { - t.Error("Job was expected to be finished") - } - - job.Status.Conditions[0].Status = api.ConditionFalse - if isJobFinished(job) { - t.Error("Job was not expected to be finished") - } - - job.Status.Conditions[0].Status = api.ConditionUnknown - if isJobFinished(job) { - t.Error("Job was not expected to be finished") - } -} - func TestWatchPods(t *testing.T) { testJob := newJob(2, 2) clientset := fake.NewSimpleClientset(testJob) diff --git a/pkg/controller/job/utils.go b/pkg/controller/job/utils.go new file mode 100644 index 00000000000..64d1f4268a8 --- /dev/null +++ b/pkg/controller/job/utils.go @@ -0,0 +1,31 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/batch" +) + +func IsJobFinished(j *batch.Job) bool { + for _, c := range j.Status.Conditions { + if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue { + return true + } + } + return false +} diff --git a/pkg/controller/job/utils_test.go b/pkg/controller/job/utils_test.go new file mode 100644 index 00000000000..59f2b63f19f --- /dev/null +++ b/pkg/controller/job/utils_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/batch" +) + +func TestIsJobFinished(t *testing.T) { + job := &batch.Job{ + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{{ + Type: batch.JobComplete, + Status: api.ConditionTrue, + }}, + }, + } + + if !IsJobFinished(job) { + t.Error("Job was expected to be finished") + } + + job.Status.Conditions[0].Status = api.ConditionFalse + if IsJobFinished(job) { + t.Error("Job was not expected to be finished") + } + + job.Status.Conditions[0].Status = api.ConditionUnknown + if IsJobFinished(job) { + t.Error("Job was not expected to be finished") + } +} diff --git a/pkg/controller/scheduledjob/controller.go b/pkg/controller/scheduledjob/controller.go index e62858103f8..87cb9891736 100644 --- a/pkg/controller/scheduledjob/controller.go +++ b/pkg/controller/scheduledjob/controller.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -84,8 +85,8 @@ func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *Schedu func (jm *ScheduledJobController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting ScheduledJob Manager") - // Check things every 1 second. - go wait.Until(jm.SyncAll, 1*time.Second, stopCh) + // Check things every 10 second. + go wait.Until(jm.SyncAll, 10*time.Second, stopCh) <-stopCh glog.Infof("Shutting down ScheduledJob Manager") } @@ -118,13 +119,11 @@ func (jm *ScheduledJobController) SyncAll() { // SyncOne reconciles a ScheduledJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". -// Returns a new ScheduledJobStatus if an update to status is required, else nil. // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing. func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) { - nameForLog := fmt.Sprintf("namespace/%s/scheduledJob/%s", sj.Namespace, sj.Name) + nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) - glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) for _, j := range js { found := inActiveList(sj, j.ObjectMeta.UID) if !found { @@ -140,7 +139,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. // TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about? } else { - if isJobActive(&j) { + if job.IsJobFinished(&j) { deleteFromActiveList(&sj, j.ObjectMeta.UID) // TODO: event to call out failure vs success. recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) @@ -152,7 +151,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl glog.Errorf("Unable to update status for %s: %v", nameForLog, err) } - if sj.Spec.Suspend { + if sj.Spec.Suspend != nil && *sj.Spec.Suspend { glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return } @@ -200,7 +199,6 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl return } if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { - glog.Errorf("Not starting job for %s because of prior execution still running and concurrency policy is Replace and delete is not supported yet", nameForLog) for _, j := range sj.Status.Active { glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog) if err := jc.DeleteJob(j.Namespace, j.Name); err != nil { @@ -231,7 +229,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl // iteration of SyncAll, we might not see our own status update, and // then post one again. So, we need to use the job name as a lock to // prevent us from making the job twice. TODO: name the job - // deterministically. + // deterministically (via hash of its scheduled time). // Add the just-started job to the status list. ref, err := getRef(jobResp) @@ -240,7 +238,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl } else { sj.Status.Active = append(sj.Status.Active, *ref) } - sj.Status.LastScheduleTime = &unversioned.Time{scheduledTime} + sj.Status.LastScheduleTime = &unversioned.Time{Time: scheduledTime} if err := sjc.UpdateStatus(&sj); err != nil { glog.Infof("Unable to update status for %s: %v", nameForLog, err) } diff --git a/pkg/controller/scheduledjob/controller_test.go b/pkg/controller/scheduledjob/controller_test.go index 32df1cee4dd..a6bfffc76c3 100644 --- a/pkg/controller/scheduledjob/controller_test.go +++ b/pkg/controller/scheduledjob/controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,13 +17,14 @@ limitations under the License. package scheduledjob import ( + "testing" + "time" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/types" - "testing" - "time" ) // schedule is hourly on the hour @@ -78,8 +79,8 @@ func scheduledJob() batch.ScheduledJob { Name: "myscheduledjob", Namespace: "snazzycats", UID: types.UID("1a2b3c"), - SelfLink: "/apis/extensions/v1beta1/namespaces/snazzycats/jobs/myscheduledjob", - CreationTimestamp: unversioned.Time{justBeforeTheHour()}, + SelfLink: "/apis/batch/v2alpha1/namespaces/snazzycats/jobs/myscheduledjob", + CreationTimestamp: unversioned.Time{Time: justBeforeTheHour()}, }, Spec: batch.ScheduledJobSpec{ Schedule: "0 0 * * * * ?", @@ -89,25 +90,41 @@ func scheduledJob() batch.ScheduledJob { Labels: map[string]string{"a": "b"}, Annotations: map[string]string{"x": "y"}, }, - Spec: batch.JobSpec{ - Template: api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Image: "foo/bar"}, - }, - }, - }, + Spec: jobSpec(), + }, + }, + } +} + +func jobSpec() batch.JobSpec { + return batch.JobSpec{ + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Image: "foo/bar"}, }, }, }, } } +func newJob(UID string) batch.Job { + return batch.Job{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(UID), + Name: "foobar", + Namespace: api.NamespaceDefault, + SelfLink: "/apis/batch/v1/namespaces/snazzycats/jobs/myjob", + }, + Spec: jobSpec(), + } +} + var ( shortDead int64 = 10 longDead int64 = 1000000 @@ -173,20 +190,20 @@ func TestSyncOne_RunOrNot(t *testing.T) { t.Log("Test case:", name) sj := scheduledJob() sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy - sj.Spec.Suspend = tc.suspend + sj.Spec.Suspend = &tc.suspend sj.Spec.Schedule = tc.schedule if tc.deadline != noDead { sj.Spec.StartingDeadlineSeconds = &tc.deadline } if tc.ranPreviously { - sj.ObjectMeta.CreationTimestamp = unversioned.Time{justBeforeThePriorHour()} - sj.Status.LastScheduleTime = &unversioned.Time{justAfterThePriorHour()} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()} + sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()} if tc.stillActive { sj.Status.Active = []api.ObjectReference{{}} } } else { - sj.ObjectMeta.CreationTimestamp = unversioned.Time{justBeforeTheHour()} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()} if tc.stillActive { t.Errorf("Test setup error: this case makes no sense.") } @@ -228,3 +245,150 @@ func TestSyncOne_RunOrNot(t *testing.T) { // TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs, // but over time, all jobs run as expected (assuming Allow and no deadline). + +// TestSyncOne_Status tests sj.UpdateStatus in SyncOne +func TestSyncOne_Status(t *testing.T) { + finishedJob := newJob("1") + finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batch.JobCondition{Type: batch.JobComplete, Status: api.ConditionTrue}) + unexpectedJob := newJob("2") + + testCases := map[string]struct { + // sj spec + concurrencyPolicy batch.ConcurrencyPolicy + suspend bool + schedule string + deadline int64 + + // sj status + ranPreviously bool + hasFinishedJob bool + + // environment + now time.Time + hasUnexpectedJob bool + + // expectations + expectCreate bool + expectDelete bool + }{ + "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F}, + "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F}, + "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F}, + "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F}, + "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F}, + "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F}, + "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F}, + "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F}, + "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, T, F}, + + "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F}, + "prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F}, + "prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F}, + "prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F}, + "prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F}, + "prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F}, + + "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F}, + "prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F}, + "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F}, + "prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F}, + "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F}, + "prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F}, + "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F}, + "prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F}, + "prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F}, + "prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F}, + "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F}, + "prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F}, + "prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F}, + "prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F}, + "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, T, F}, + "prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, T, F}, + "prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, T, F}, + } + + for name, tc := range testCases { + t.Log("Test case:", name) + // Setup the test + sj := scheduledJob() + sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy + sj.Spec.Suspend = &tc.suspend + sj.Spec.Schedule = tc.schedule + if tc.deadline != noDead { + sj.Spec.StartingDeadlineSeconds = &tc.deadline + } + if tc.ranPreviously { + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()} + sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()} + } else { + if tc.hasFinishedJob || tc.hasUnexpectedJob { + t.Errorf("Test setup error: this case makes no sense.") + } + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()} + } + jobs := []batch.Job{} + if tc.hasFinishedJob { + ref, err := getRef(&finishedJob) + if err != nil { + t.Errorf("Test setup error: failed to get job's ref: %v.", err) + } + sj.Status.Active = []api.ObjectReference{*ref} + jobs = append(jobs, finishedJob) + } + if tc.hasUnexpectedJob { + jobs = append(jobs, unexpectedJob) + } + + jc := &fakeJobControl{} + sjc := &fakeSJControl{} + recorder := record.NewFakeRecorder(10) + + // Run the code + SyncOne(sj, jobs, tc.now, jc, sjc, recorder) + + // Status update happens once when ranging through job list, and another one if create jobs. + expectUpdates := 1 + // Events happens when there's unexpected / finished jobs, and upon job creation / deletion. + expectedEvents := 0 + if tc.expectCreate { + expectUpdates++ + expectedEvents++ + } + if tc.expectDelete { + expectedEvents++ + } + if tc.hasFinishedJob { + expectedEvents++ + } + if tc.hasUnexpectedJob { + expectedEvents++ + } + + if len(recorder.Events) != expectedEvents { + t.Errorf("Expected %d event, actually %v: %#v", expectedEvents, len(recorder.Events), recorder.Events) + } + + if expectUpdates != len(sjc.Updates) { + t.Errorf("expected %d status updates, actually %d", expectUpdates, len(sjc.Updates)) + } + + if tc.hasFinishedJob && inActiveList(sjc.Updates[0], finishedJob.UID) { + t.Errorf("Expected finished job removed from active list, actually active list = %#v.", sjc.Updates[0].Status.Active) + } + + if tc.hasUnexpectedJob && inActiveList(sjc.Updates[0], unexpectedJob.UID) { + t.Errorf("Expected unexpected job not added to active list, actually active list = %#v.", sjc.Updates[0].Status.Active) + } + + if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) { + t.Errorf("Expected LastScheduleTime updated to %s, got %s.", topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime) + } + } +} diff --git a/pkg/controller/scheduledjob/doc.go b/pkg/controller/scheduledjob/doc.go index 3fafb54d088..649d7e4404e 100644 --- a/pkg/controller/scheduledjob/doc.go +++ b/pkg/controller/scheduledjob/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/controller/scheduledjob/injection.go b/pkg/controller/scheduledjob/injection.go index 5b81edc049c..fed5c06aa51 100644 --- a/pkg/controller/scheduledjob/injection.go +++ b/pkg/controller/scheduledjob/injection.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/controller/scheduledjob/utils.go b/pkg/controller/scheduledjob/utils.go index 0c4213c27aa..2adc6bfdf80 100644 --- a/pkg/controller/scheduledjob/utils.go +++ b/pkg/controller/scheduledjob/utils.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -119,9 +119,9 @@ func getNextStartTimeAfter(schedule string, now time.Time) (time.Time, error) { return sched.Next(now), nil } -// getRecentUnmetScheduleTimes gets a slice of times that have passed when a Job should have started but did not +// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not. // -// If there are too many unstarted times, only the most recent may be returned. +// If there are too many (>100) unstarted times, just give up and return an empty slice. // If there were missed times prior to the last known start time, then those are not returned. func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.Time, error) { starts := []time.Time{} @@ -166,25 +166,13 @@ func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.T // I've somewhat arbitrarily picked 100, as more than 80, but // but less than "lots". if len(starts) > 100 { + // We can't get the most recent times so just return an empty slice return []time.Time{}, fmt.Errorf("Too many missed start times to list") } } return starts, nil } -func isJobActive(j *batch.Job) bool { - return !isJobFinished(j) -} - -func isJobFinished(j *batch.Job) bool { - for _, c := range j.Status.Conditions { - if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue { - return true - } - } - return false -} - // XXX unit test this // getJobFromTemplate makes a Job from a ScheduledJob diff --git a/pkg/controller/scheduledjob/utils_test.go b/pkg/controller/scheduledjob/utils_test.go index 78bfd02a4bf..226015cf9ed 100644 --- a/pkg/controller/scheduledjob/utils_test.go +++ b/pkg/controller/scheduledjob/utils_test.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -283,7 +283,7 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { { // Case 1: no known start times, and none needed yet. // Creation time is before T1. - sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)} // Current time is more than creation time, but less than T1. now := T1.Add(-7 * time.Minute) times, err := getRecentUnmetScheduleTimes(sj, now) @@ -297,7 +297,7 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { { // Case 2: no known start times, and one needed. // Creation time is before T1. - sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)} // Current time is after T1 now := T1.Add(2 * time.Second) times, err := getRecentUnmetScheduleTimes(sj, now) @@ -313,9 +313,9 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { { // Case 3: known LastScheduleTime, no start needed. // Creation time is before T1. - sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)} // Status shows a start at the expected time. - sj.Status.LastScheduleTime = &unversioned.Time{T1} + sj.Status.LastScheduleTime = &unversioned.Time{Time: T1} // Current time is after T1 now := T1.Add(2 * time.Minute) times, err := getRecentUnmetScheduleTimes(sj, now) @@ -329,9 +329,9 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { { // Case 4: known LastScheduleTime, a start needed // Creation time is before T1. - sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-10 * time.Minute)} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-10 * time.Minute)} // Status shows a start at the expected time. - sj.Status.LastScheduleTime = &unversioned.Time{T1} + sj.Status.LastScheduleTime = &unversioned.Time{Time: T1} // Current time is after T1 and after T2 now := T2.Add(5 * time.Minute) times, err := getRecentUnmetScheduleTimes(sj, now) @@ -346,8 +346,8 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { } { // Case 5: known LastScheduleTime, two starts needed - sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-2 * time.Hour)} - sj.Status.LastScheduleTime = &unversioned.Time{T1.Add(-1 * time.Hour)} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-2 * time.Hour)} + sj.Status.LastScheduleTime = &unversioned.Time{Time: T1.Add(-1 * time.Hour)} // Current time is after T1 and after T2 now := T2.Add(5 * time.Minute) times, err := getRecentUnmetScheduleTimes(sj, now) @@ -367,8 +367,8 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { } { // Case 6: now is way way ahead of last start time. - sj.ObjectMeta.CreationTimestamp = unversioned.Time{T1.Add(-2 * time.Hour)} - sj.Status.LastScheduleTime = &unversioned.Time{T1.Add(-1 * time.Hour)} + sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: T1.Add(-2 * time.Hour)} + sj.Status.LastScheduleTime = &unversioned.Time{Time: T1.Add(-1 * time.Hour)} now := T2.Add(10 * 24 * time.Hour) _, err := getRecentUnmetScheduleTimes(sj, now) if err == nil { @@ -377,28 +377,3 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { } } - -func TestIsJobFinished(t *testing.T) { - job := &batch.Job{ - Status: batch.JobStatus{ - Conditions: []batch.JobCondition{{ - Type: batch.JobComplete, - Status: api.ConditionTrue, - }}, - }, - } - - if !isJobFinished(job) { - t.Error("Job was expected to be finished") - } - - job.Status.Conditions[0].Status = api.ConditionFalse - if isJobFinished(job) { - t.Error("Job was not expected to be finished") - } - - job.Status.Conditions[0].Status = api.ConditionUnknown - if isJobFinished(job) { - t.Error("Job was not expected to be finished") - } -} diff --git a/pkg/kubectl/cmd/util/clientcache.go b/pkg/kubectl/cmd/util/clientcache.go index 1dc50ad8099..81476cabaf0 100644 --- a/pkg/kubectl/cmd/util/clientcache.go +++ b/pkg/kubectl/cmd/util/clientcache.go @@ -17,8 +17,6 @@ limitations under the License. package util import ( - "fmt" - fed_clientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" @@ -84,7 +82,6 @@ func (c *ClientCache) ClientConfigForVersion(version *unversioned.GroupVersion) return nil, err } config.GroupVersion = negotiatedVersion - fmt.Printf(" Negotiated version %v\n", negotiatedVersion) if version != nil { c.configs[*version] = &config diff --git a/pkg/kubectl/cmd/util/factory.go b/pkg/kubectl/cmd/util/factory.go index 251c8a78f5c..a961805cd84 100644 --- a/pkg/kubectl/cmd/util/factory.go +++ b/pkg/kubectl/cmd/util/factory.go @@ -352,7 +352,6 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { }, ClientForMapping: func(mapping *meta.RESTMapping) (resource.RESTClient, error) { cfg, err := clientConfig.ClientConfig() - fmt.Printf("Mapping version: %#v", mappingVersion) if err != nil { return nil, err }