diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 7f99a8f25bd..59458cb0e37 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/node" "k8s.io/kubernetes/pkg/controller/persistentvolume" @@ -66,6 +67,7 @@ type CMServer struct { ConcurrentEndpointSyncs int ConcurrentRCSyncs int ConcurrentDSCSyncs int + ConcurrentJobSyncs int ServiceSyncPeriod time.Duration NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -104,6 +106,7 @@ func NewCMServer() *CMServer { ConcurrentEndpointSyncs: 5, ConcurrentRCSyncs: 5, ConcurrentDSCSyncs: 2, + ConcurrentJobSyncs: 5, ServiceSyncPeriod: 5 * time.Minute, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, @@ -238,6 +241,9 @@ func (s *CMServer) Run(_ []string) error { go daemon.NewDaemonSetsController(kubeClient). Run(s.ConcurrentDSCSyncs, util.NeverStop) + go job.NewJobManager(kubeClient). + Run(s.ConcurrentJobSyncs, util.NeverStop) + cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { glog.Fatalf("Cloud provider could not be initialized: %v", err) diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index c01f628d7ce..56c914d7e41 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -139,6 +139,13 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { j.RollingUpdate = &rollingUpdate } }, + func(j *experimental.JobSpec, c fuzz.Continue) { + c.FuzzNoCustom(j) // fuzz self without calling this function again + completions := c.Rand.Int() + parallelism := c.Rand.Int() + j.Completions = &completions + j.Parallelism = ¶llelism + }, func(j *api.List, c fuzz.Continue) { c.FuzzNoCustom(j) // fuzz self without calling this function again // TODO: uncomment when round trip starts from a versioned object diff --git a/pkg/apis/experimental/v1/defaults.go b/pkg/apis/experimental/v1/defaults.go index cc141513c25..c03a419f0c2 100644 --- a/pkg/apis/experimental/v1/defaults.go +++ b/pkg/apis/experimental/v1/defaults.go @@ -75,5 +75,15 @@ func addDefaultingFuncs() { *obj.Spec.UniqueLabelKey = "deployment.kubernetes.io/podTemplateHash" } }, + func(obj *Job) { + if obj.Spec.Completions == nil { + completions := 1 + obj.Spec.Completions = &completions + } + if obj.Spec.Parallelism == nil { + parallelism := 2 + obj.Spec.Parallelism = ¶llelism + } + }, ) } diff --git a/pkg/apis/experimental/v1/defaults_test.go b/pkg/apis/experimental/v1/defaults_test.go index afc00939fa2..1b38b90ba11 100644 --- a/pkg/apis/experimental/v1/defaults_test.go +++ b/pkg/apis/experimental/v1/defaults_test.go @@ -189,6 +189,43 @@ func TestSetDefaultDeployment(t *testing.T) { } } +func TestSetDefaultJob(t *testing.T) { + expected := &Job{ + Spec: JobSpec{ + Completions: newInt(1), + Parallelism: newInt(2), + }, + } + tests := []*Job{ + {}, + { + Spec: JobSpec{ + Completions: newInt(1), + }, + }, + { + Spec: JobSpec{ + Parallelism: newInt(2), + }, + }, + } + + for _, original := range tests { + obj2 := roundTrip(t, runtime.Object(original)) + got, ok := obj2.(*Job) + if !ok { + t.Errorf("unexpected object: %v", got) + t.FailNow() + } + if *got.Spec.Completions != *expected.Spec.Completions { + t.Errorf("got different completions than expected: %d %d", *got.Spec.Completions, *expected.Spec.Completions) + } + if *got.Spec.Parallelism != *expected.Spec.Parallelism { + t.Errorf("got different parallelism than expected: %d %d", *got.Spec.Parallelism, *expected.Spec.Parallelism) + } + } +} + func roundTrip(t *testing.T, obj runtime.Object) runtime.Object { data, err := v1.Codec.Encode(obj) if err != nil { diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 6b9db47db8a..284d713b75a 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -344,3 +344,55 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E err = fmt.Errorf("Could not find endpoints for service: %v", svc.Name) return } + +// StoreToJobLister gives a store List and Exists methods. The store must contain only Jobs. +type StoreToJobLister struct { + Store +} + +// Exists checks if the given job exists in the store. +func (s *StoreToJobLister) Exists(job *experimental.Job) (bool, error) { + _, exists, err := s.Store.Get(job) + if err != nil { + return false, err + } + return exists, nil +} + +// StoreToJobLister lists all jobs in the store. +func (s *StoreToJobLister) List() (jobs []experimental.Job, err error) { + for _, c := range s.Store.List() { + jobs = append(jobs, *(c.(*experimental.Job))) + } + return jobs, nil +} + +// GetPodControllers returns a list of jobs managing a pod. Returns an error only if no matching jobs are found. +func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []experimental.Job, err error) { + var selector labels.Selector + var job experimental.Job + + if len(pod.Labels) == 0 { + err = fmt.Errorf("No jobs found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + job = *m.(*experimental.Job) + if job.Namespace != pod.Namespace { + continue + } + labelSet := labels.Set(job.Spec.Selector) + selector = labels.Set(job.Spec.Selector).AsSelector() + + // Job with a nil or empty selector match nothing + if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + jobs = append(jobs, job) + } + if len(jobs) == 0 { + err = fmt.Errorf("Could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} diff --git a/pkg/client/unversioned/testclient/fake_jobs.go b/pkg/client/unversioned/testclient/fake_jobs.go new file mode 100644 index 00000000000..63a43903fa2 --- /dev/null +++ b/pkg/client/unversioned/testclient/fake_jobs.go @@ -0,0 +1,86 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testclient + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/watch" +) + +// FakeJobs implements JobInterface. Meant to be embedded into a struct to get a default +// implementation. This makes faking out just the method you want to test easier. +type FakeJobs struct { + Fake *FakeExperimental + Namespace string +} + +func (c *FakeJobs) Get(name string) (*experimental.Job, error) { + obj, err := c.Fake.Invokes(NewGetAction("jobs", c.Namespace, name), &experimental.Job{}) + if obj == nil { + return nil, err + } + + return obj.(*experimental.Job), err +} + +func (c *FakeJobs) List(label labels.Selector, fields fields.Selector) (*experimental.JobList, error) { + obj, err := c.Fake.Invokes(NewListAction("jobs", c.Namespace, label, nil), &experimental.JobList{}) + if obj == nil { + return nil, err + } + + return obj.(*experimental.JobList), err +} + +func (c *FakeJobs) Create(job *experimental.Job) (*experimental.Job, error) { + obj, err := c.Fake.Invokes(NewCreateAction("jobs", c.Namespace, job), job) + if obj == nil { + return nil, err + } + + return obj.(*experimental.Job), err +} + +func (c *FakeJobs) Update(job *experimental.Job) (*experimental.Job, error) { + obj, err := c.Fake.Invokes(NewUpdateAction("jobs", c.Namespace, job), job) + if obj == nil { + return nil, err + } + + return obj.(*experimental.Job), err +} + +func (c *FakeJobs) Delete(name string, options *api.DeleteOptions) error { + _, err := c.Fake.Invokes(NewDeleteAction("jobs", c.Namespace, name), &experimental.Job{}) + return err +} + +func (c *FakeJobs) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { + return c.Fake.InvokesWatch(NewWatchAction("jobs", c.Namespace, label, field, resourceVersion)) +} + +func (c *FakeJobs) UpdateStatus(job *experimental.Job) (result *experimental.Job, err error) { + obj, err := c.Fake.Invokes(NewUpdateSubresourceAction("jobs", "status", c.Namespace, job), job) + if obj == nil { + return nil, err + } + + return obj.(*experimental.Job), err +} diff --git a/pkg/client/unversioned/testclient/testclient.go b/pkg/client/unversioned/testclient/testclient.go index 6dde3db6703..1dddfbc70c8 100644 --- a/pkg/client/unversioned/testclient/testclient.go +++ b/pkg/client/unversioned/testclient/testclient.go @@ -263,5 +263,5 @@ func (c *FakeExperimental) Scales(namespace string) client.ScaleInterface { } func (c *FakeExperimental) Jobs(namespace string) client.JobInterface { - panic("unimplemented") + return &FakeJobs{Fake: c, Namespace: namespace} } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index e3a5dbb1213..9b97146fa2f 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -26,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/validation" - "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -213,10 +212,10 @@ func NewControllerExpectations() *ControllerExpectations { // PodControlInterface is an interface that knows how to add or delete pods // created as an interface to allow testing. type PodControlInterface interface { - // CreateReplica creates new replicated pods according to the spec. - CreateReplica(namespace string, controller *api.ReplicationController) error - // CreateReplicaOnNode creates a new pod according to the spec on the specified node. - CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error + // CreatePods creates new pods according to the spec. + CreatePods(namespace string, template *api.PodTemplateSpec, object runtime.Object) error + // CreatePodsOnNode creates a new pod accorting to the spec on the specified node. + CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error // DeletePod deletes the pod identified by podID. DeletePod(namespace string, podID string) error } @@ -227,7 +226,7 @@ type RealPodControl struct { Recorder record.EventRecorder } -func getReplicaLabelSet(template *api.PodTemplateSpec) labels.Set { +func getPodsLabelSet(template *api.PodTemplateSpec) labels.Set { desiredLabels := make(labels.Set) for k, v := range template.Labels { desiredLabels[k] = v @@ -235,7 +234,7 @@ func getReplicaLabelSet(template *api.PodTemplateSpec) labels.Set { return desiredLabels } -func getReplicaAnnotationSet(template *api.PodTemplateSpec, object runtime.Object) (labels.Set, error) { +func getPodsAnnotationSet(template *api.PodTemplateSpec, object runtime.Object) (labels.Set, error) { desiredAnnotations := make(labels.Set) for k, v := range template.Annotations { desiredAnnotations[k] = v @@ -254,7 +253,7 @@ func getReplicaAnnotationSet(template *api.PodTemplateSpec, object runtime.Objec return desiredAnnotations, nil } -func getReplicaPrefix(controllerName string) string { +func getPodsPrefix(controllerName string) string { // use the dash (if the name isn't too long) to make the pod name a bit prettier prefix := fmt.Sprintf("%s-", controllerName) if ok, _ := validation.ValidatePodName(prefix, true); !ok { @@ -263,44 +262,25 @@ func getReplicaPrefix(controllerName string) string { return prefix } -func (r RealPodControl) CreateReplica(namespace string, controller *api.ReplicationController) error { - desiredLabels := getReplicaLabelSet(controller.Spec.Template) - desiredAnnotations, err := getReplicaAnnotationSet(controller.Spec.Template, controller) - if err != nil { - return err - } - prefix := getReplicaPrefix(controller.Name) - - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Labels: desiredLabels, - Annotations: desiredAnnotations, - GenerateName: prefix, - }, - } - if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil { - return fmt.Errorf("unable to convert pod template: %v", err) - } - if labels.Set(pod.Labels).AsSelector().Empty() { - return fmt.Errorf("unable to create pod replica, no labels") - } - if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil { - r.Recorder.Eventf(controller, "FailedCreate", "Error creating: %v", err) - return fmt.Errorf("unable to create pod replica: %v", err) - } else { - glog.V(4).Infof("Controller %v created pod %v", controller.Name, newPod.Name) - r.Recorder.Eventf(controller, "SuccessfulCreate", "Created pod: %v", newPod.Name) - } - return nil +func (r RealPodControl) CreatePods(namespace string, template *api.PodTemplateSpec, object runtime.Object) error { + return r.createPods("", namespace, template, object) } -func (r RealPodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error { - desiredLabels := getReplicaLabelSet(ds.Spec.Template) - desiredAnnotations, err := getReplicaAnnotationSet(ds.Spec.Template, ds) +func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error { + return r.createPods(nodeName, namespace, template, object) +} + +func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error { + desiredLabels := getPodsLabelSet(template) + desiredAnnotations, err := getPodsAnnotationSet(template, object) if err != nil { return err } - prefix := getReplicaPrefix(ds.Name) + meta, err := api.ObjectMetaFor(object) + if err != nil { + return fmt.Errorf("object does not have ObjectMeta, %v", err) + } + prefix := getPodsPrefix(meta.Name) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -309,22 +289,22 @@ func (r RealPodControl) CreateReplicaOnNode(namespace string, ds *experimental.D GenerateName: prefix, }, } - if err := api.Scheme.Convert(&ds.Spec.Template.Spec, &pod.Spec); err != nil { + if len(nodeName) != 0 { + pod.Spec.NodeName = nodeName + } + if err := api.Scheme.Convert(&template.Spec, &pod.Spec); err != nil { return fmt.Errorf("unable to convert pod template: %v", err) } - // if a pod does not have labels then it cannot be controlled by any controller if labels.Set(pod.Labels).AsSelector().Empty() { - return fmt.Errorf("unable to create pod replica, no labels") + return fmt.Errorf("unable to create pods, no labels") } - pod.Spec.NodeName = nodeName if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil { - r.Recorder.Eventf(ds, "FailedCreate", "Error creating: %v", err) - return fmt.Errorf("unable to create pod replica: %v", err) + r.Recorder.Eventf(object, "FailedCreate", "Error creating: %v", err) + return fmt.Errorf("unable to create pods: %v", err) } else { - glog.V(4).Infof("Controller %v created pod %v", ds.Name, newPod.Name) - r.Recorder.Eventf(ds, "SuccessfulCreate", "Created pod: %v", newPod.Name) + glog.V(4).Infof("Controller %v created pod %v", meta.Name, newPod.Name) + r.Recorder.Eventf(object, "SuccessfulCreate", "Created pod: %v", newPod.Name) } - return nil } diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 07574f2500a..84d2c7e844f 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -180,7 +180,7 @@ func TestControllerExpectations(t *testing.T) { } } -func TestCreateReplica(t *testing.T) { +func TestCreatePods(t *testing.T) { ns := api.NamespaceDefault body := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}}) fakeHandler := util.FakeHandler{ @@ -199,7 +199,7 @@ func TestCreateReplica(t *testing.T) { controllerSpec := newReplicationController(1) // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template - podControl.CreateReplica(ns, controllerSpec) + podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec) expectedPod := api.Pod{ ObjectMeta: api.ObjectMeta{ diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index e6db35b14d1..d0b93427a2a 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -371,7 +371,7 @@ func (dsc *DaemonSetsController) manage(ds *experimental.DaemonSet) { glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods) for i := range nodesNeedingDaemonPods { - if err := dsc.podControl.CreateReplicaOnNode(ds.Namespace, ds, nodesNeedingDaemonPods[i]); err != nil { + if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[i], ds.Namespace, ds.Spec.Template, ds); err != nil { glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) util.HandleError(err) diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index 1ab55b2b72c..a0ce8c830b5 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" ) @@ -38,7 +39,7 @@ var ( ) type FakePodControl struct { - daemonSet []experimental.DaemonSet + podSpec []api.PodTemplateSpec deletePodName []string lock sync.Mutex err error @@ -48,17 +49,17 @@ func init() { api.ForTesting_ReferencesAllowBlankSelfLinks = true } -func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error { +func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { return nil } -func (f *FakePodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error { +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() if f.err != nil { return f.err } - f.daemonSet = append(f.daemonSet, *ds) + f.podSpec = append(f.podSpec, *spec) return nil } @@ -75,7 +76,7 @@ func (f *FakePodControl) clear() { f.lock.Lock() defer f.lock.Unlock() f.deletePodName = []string{} - f.daemonSet = []experimental.DaemonSet{} + f.podSpec = []api.PodTemplateSpec{} } func newDaemonSet(name string) *experimental.DaemonSet { @@ -164,8 +165,8 @@ func newTestController() (*DaemonSetsController, *FakePodControl) { } func validateSyncDaemonSets(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) { - if len(fakePodControl.daemonSet) != expectedCreates { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSet)) + if len(fakePodControl.podSpec) != expectedCreates { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.podSpec)) } if len(fakePodControl.deletePodName) != expectedDeletes { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName)) diff --git a/pkg/controller/job/doc.go b/pkg/controller/job/doc.go new file mode 100644 index 00000000000..9c569bfc08c --- /dev/null +++ b/pkg/controller/job/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package job contains logic for watching and synchronizing jobs. +package job diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go new file mode 100644 index 00000000000..e5bb4d76346 --- /dev/null +++ b/pkg/controller/job/job_controller.go @@ -0,0 +1,452 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "reflect" + "sort" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/replication" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +type JobManager struct { + kubeClient client.Interface + podControl controller.PodControlInterface + + // To allow injection of updateJob for testing. + updateHandler func(job *experimental.Job) error + syncHandler func(jobKey string) error + // podStoreSynced returns true if the pod store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + podStoreSynced func() bool + + // A TTLCache of pod creates/deletes each rc expects to see + expectations controller.ControllerExpectationsInterface + + // A store of job, populated by the jobController + jobStore cache.StoreToJobLister + // Watches changes to all jobs + jobController *framework.Controller + + // A store of pods, populated by the podController + podStore cache.StoreToPodLister + // Watches changes to all pods + podController *framework.Controller + + // Jobs that need to be updated + queue *workqueue.Type +} + +func NewJobManager(kubeClient client.Interface) *JobManager { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + + jm := &JobManager{ + kubeClient: kubeClient, + podControl: controller.RealPodControl{ + KubeClient: kubeClient, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job"}), + }, + expectations: controller.NewControllerExpectations(), + queue: workqueue.New(), + } + + jm.jobStore.Store, jm.jobController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return jm.kubeClient.Experimental().Jobs(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return jm.kubeClient.Experimental().Jobs(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &experimental.Job{}, + replicationcontroller.FullControllerResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: jm.enqueueController, + UpdateFunc: func(old, cur interface{}) { + job := cur.(*experimental.Job) + for _, c := range job.Status.Conditions { + if c.Type == experimental.JobComplete && c.Status == api.ConditionTrue { + return + } + } + jm.enqueueController(cur) + }, + DeleteFunc: jm.enqueueController, + }, + ) + + jm.podStore.Store, jm.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return jm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return jm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + replicationcontroller.PodRelistPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: jm.addPod, + UpdateFunc: jm.updatePod, + DeleteFunc: jm.deletePod, + }, + ) + + jm.updateHandler = jm.updateJob + jm.syncHandler = jm.syncJob + jm.podStoreSynced = jm.podController.HasSynced + return jm +} + +// Run the main goroutine responsible for watching and syncing jobs. +func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go jm.jobController.Run(stopCh) + go jm.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(jm.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down Job Manager") + jm.queue.ShutDown() +} + +// getPodJob returns the job managing the given pod. +func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job { + jobs, err := jm.jobStore.GetPodJobs(pod) + if err != nil { + glog.V(4).Infof("No jobs found for pod %v, job manager will avoid syncing", pod.Name) + return nil + } + // TODO: add sorting and rethink the overlaping controllers, internally and with RCs + return &jobs[0] +} + +// When a pod is created, enqueue the controller that manages it and update it's expectations. +func (jm *JobManager) addPod(obj interface{}) { + pod := obj.(*api.Pod) + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + jm.deletePod(pod) + return + } + if job := jm.getPodJob(pod); job != nil { + jobKey, err := controller.KeyFunc(job) + if err != nil { + glog.Errorf("Couldn't get key for job %#v: %v", job, err) + return + } + jm.expectations.CreationObserved(jobKey) + jm.enqueueController(job) + } +} + +// When a pod is updated, figure out what job/s manage it and wake them up. +// If the labels of the pod have changed we need to awaken both the old +// and new job. old and cur must be *api.Pod types. +func (jm *JobManager) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + // A periodic relist will send update events for all known pods. + return + } + curPod := cur.(*api.Pod) + if curPod.DeletionTimestamp != nil { + // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, + // and after such time has passed, the kubelet actually deletes it from the store. We receive an update + // for modification of the deletion timestamp and expect an job to create more pods asap, not wait + // until the kubelet actually deletes the pod. + jm.deletePod(curPod) + return + } + if job := jm.getPodJob(curPod); job != nil { + jm.enqueueController(job) + } + oldPod := old.(*api.Pod) + // Only need to get the old job if the labels changed. + if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { + // If the old and new job are the same, the first one that syncs + // will set expectations preventing any damage from the second. + if oldJob := jm.getPodJob(oldPod); oldJob != nil { + jm.enqueueController(oldJob) + } + } +} + +// When a pod is deleted, enqueue the job that manages the pod and update its expectations. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (jm *JobManager) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new job will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a job recreates a pod", obj, controller.ExpectationsTimeout) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before job recreates a pod", obj, controller.ExpectationsTimeout) + return + } + } + if job := jm.getPodJob(pod); job != nil { + jobKey, err := controller.KeyFunc(job) + if err != nil { + glog.Errorf("Couldn't get key for job %#v: %v", job, err) + return + } + jm.expectations.DeletionObserved(jobKey) + jm.enqueueController(job) + } +} + +// obj could be an *experimental.Job, or a DeletionFinalStateUnknown marker item. +func (jm *JobManager) enqueueController(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + + // TODO: Handle overlapping controllers better. Either disallow them at admission time or + // deterministically avoid syncing controllers that fight over pods. Currently, we only + // ensure that the same controller is synced for a given pod. When we periodically relist + // all controllers there will still be some replica instability. One way to handle this is + // by querying the store for all controllers that this rc overlaps, as well as all + // controllers that overlap this rc, and sorting them. + jm.queue.Add(key) +} + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (jm *JobManager) worker() { + for { + func() { + key, quit := jm.queue.Get() + if quit { + return + } + defer jm.queue.Done(key) + err := jm.syncHandler(key.(string)) + if err != nil { + glog.Errorf("Error syncing job: %v", err) + } + }() + } +} + +// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning +// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked +// concurrently with the same key. +func (jm *JobManager) syncJob(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) + }() + + obj, exists, err := jm.jobStore.Store.GetByKey(key) + if !exists { + glog.Infof("Job has been deleted %v", key) + jm.expectations.DeleteExpectations(key) + return nil + } + if err != nil { + glog.Infof("Unable to retrieve job %v from store: %v", key, err) + jm.queue.Add(key) + return err + } + job := *obj.(*experimental.Job) + if !jm.podStoreSynced() { + // Sleep so we give the pod reflector goroutine a chance to run. + time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod) + glog.Infof("Waiting for pods controller to sync, requeuing job %v", job.Name) + jm.enqueueController(&job) + return nil + } + + // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in + // and update the expectations after we've retrieved active pods from the store. If a new pod enters + // the store after we've checked the expectation, the job sync is just deferred till the next relist. + jobKey, err := controller.KeyFunc(&job) + if err != nil { + glog.Errorf("Couldn't get key for job %#v: %v", job, err) + return err + } + jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey) + podList, err := jm.podStore.Pods(job.Namespace).List(labels.Set(job.Spec.Selector).AsSelector()) + if err != nil { + glog.Errorf("Error getting pods for job %q: %v", key, err) + jm.queue.Add(key) + return err + } + + activePods := controller.FilterActivePods(podList.Items) + active := len(activePods) + successful, unsuccessful := getStatus(jobKey, job.Spec.Template.Spec.RestartPolicy, podList.Items) + if jobNeedsSync { + active = jm.manageJob(activePods, successful, unsuccessful, &job) + } + completions := successful + if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever { + completions += unsuccessful + } + if completions == *job.Spec.Completions { + job.Status.Conditions = append(job.Status.Conditions, newCondition()) + } + + // no need to update the job if the status hasn't changed since last time + if job.Status.Active != active || job.Status.Successful != successful || job.Status.Unsuccessful != unsuccessful { + job.Status.Active = active + job.Status.Successful = successful + job.Status.Unsuccessful = unsuccessful + + if err := jm.updateHandler(&job); err != nil { + glog.V(2).Infof("Failed to update job %v, requeuing", job.Name) + jm.enqueueController(&job) + } + } + return nil +} + +func newCondition() experimental.JobCondition { + return experimental.JobCondition{ + Type: experimental.JobComplete, + Status: api.ConditionTrue, + LastProbeTime: util.Now(), + LastTransitionTime: util.Now(), + } +} + +func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) (successful, unsuccessful int) { + successful = filterPods(pods, api.PodSucceeded) + if restartPolicy == api.RestartPolicyNever { + unsuccessful = filterPods(pods, api.PodFailed) + } + return +} + +func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int { + active := len(activePods) + parallelism := *job.Spec.Parallelism + jobKey, err := controller.KeyFunc(job) + if err != nil { + glog.Errorf("Couldn't get key for job %#v: %v", job, err) + return 0 + } + + if active > parallelism { + diff := active - parallelism + jm.expectations.ExpectDeletions(jobKey, diff) + glog.V(2).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) + // Sort the pods in the order such that not-ready < ready, unscheduled + // < scheduled, and pending < running. This ensures that we delete pods + // in the earlier stages whenever possible. + sort.Sort(controller.ActivePods(activePods)) + + active -= diff + wait := sync.WaitGroup{} + wait.Add(diff) + for i := 0; i < diff; i++ { + go func(ix int) { + defer wait.Done() + if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name); err != nil { + // Decrement the expected number of deletes because the informer won't observe this deletion + glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q", jobKey) + jm.expectations.DeletionObserved(jobKey) + util.HandleError(err) + active++ + } + }(i) + } + wait.Wait() + + } else if active < parallelism { + // how many executions are left to run + diff := *job.Spec.Completions - successful + // for RestartPolicyNever we need to count unsuccessful pods as well + if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever { + diff -= unsuccessful + } + // limit to parallelism and count active pods as well + if diff > parallelism { + diff = parallelism + } + diff -= active + jm.expectations.ExpectCreations(jobKey, diff) + glog.V(2).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff) + + active += diff + wait := sync.WaitGroup{} + wait.Add(diff) + for i := 0; i < diff; i++ { + go func() { + defer wait.Done() + if err := jm.podControl.CreatePods(job.Namespace, job.Spec.Template, job); err != nil { + // Decrement the expected number of creates because the informer won't observe this pod + glog.V(2).Infof("Failed creation, decrementing expectations for controller %q", jobKey) + jm.expectations.CreationObserved(jobKey) + util.HandleError(err) + active-- + } + }() + } + wait.Wait() + } + + return active +} + +func (jm *JobManager) updateJob(job *experimental.Job) error { + _, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job) + return err +} + +// filterPods returns pods based on their phase. +func filterPods(pods []api.Pod, phase api.PodPhase) int { + result := 0 + for i := range pods { + if phase == pods[i].Status.Phase { + result++ + } + } + return result +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go new file mode 100644 index 00000000000..e01586b64ed --- /dev/null +++ b/pkg/controller/job/job_controller_test.go @@ -0,0 +1,553 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apis/experimental" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/watch" +) + +// Give each test that starts a background controller up to 1/2 a second. +// Since we need to start up a goroutine to test watch, this routine needs +// to get cpu before the test can complete. If the test is starved of cpu, +// the watch test will take up to 1/2 a second before timing out. +const controllerTimeout = 500 * time.Millisecond + +var alwaysReady = func() bool { return true } + +type FakePodControl struct { + podSpec []api.PodTemplateSpec + deletePodName []string + lock sync.Mutex + err error +} + +func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { + f.lock.Lock() + defer f.lock.Unlock() + if f.err != nil { + return f.err + } + f.podSpec = append(f.podSpec, *spec) + return nil +} + +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error { + return nil +} + +func (f *FakePodControl) DeletePod(namespace string, podName string) error { + f.lock.Lock() + defer f.lock.Unlock() + if f.err != nil { + return f.err + } + f.deletePodName = append(f.deletePodName, podName) + return nil +} +func (f *FakePodControl) clear() { + f.lock.Lock() + defer f.lock.Unlock() + f.deletePodName = []string{} + f.podSpec = []api.PodTemplateSpec{} +} + +func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *experimental.Job { + return &experimental.Job{ + ObjectMeta: api.ObjectMeta{ + Name: "foobar", + Namespace: api.NamespaceDefault, + }, + Spec: experimental.JobSpec{ + Parallelism: ¶llelism, + Completions: &completions, + Selector: map[string]string{"foo": "bar"}, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + RestartPolicy: restartPolicy, + Containers: []api.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + } +} + +func getKey(job *experimental.Job, t *testing.T) string { + if key, err := controller.KeyFunc(job); err != nil { + t.Errorf("Unexpected error getting key for job %v: %v", job.Name, err) + return "" + } else { + return key + } +} + +// create count pods with the given phase for the given job +func newPodList(count int, status api.PodPhase, job *experimental.Job) []api.Pod { + pods := []api.Pod{} + for i := 0; i < count; i++ { + newPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("pod-%v", util.Now().UnixNano()), + Labels: job.Spec.Selector, + Namespace: job.Namespace, + }, + Status: api.PodStatus{Phase: status}, + } + pods = append(pods, newPod) + } + return pods +} + +func TestControllerSyncJob(t *testing.T) { + testCases := map[string]struct { + // job setup + parallelism int + completions int + restartPolicy api.RestartPolicy + + // pod setup + podControllerError error + activePods int + successfulPods int + unsuccessfulPods int + + // expectations + expectedCreations int + expectedDeletions int + expectedActive int + expectedSuccessful int + expectedUnsuccessful int + expectedComplete bool + }{ + "job start": { + 2, 5, api.RestartPolicyOnFailure, + nil, 0, 0, 0, + 2, 0, 2, 0, 0, false, + }, + "correct # of pods": { + 2, 5, api.RestartPolicyOnFailure, + nil, 2, 0, 0, + 0, 0, 2, 0, 0, false, + }, + "too few active pods": { + 2, 5, api.RestartPolicyOnFailure, + nil, 1, 1, 0, + 1, 0, 2, 1, 0, false, + }, + "too few active pods, with controller error": { + 2, 5, api.RestartPolicyOnFailure, + fmt.Errorf("Fake error"), 1, 1, 0, + 0, 0, 1, 1, 0, false, + }, + "too many active pods": { + 2, 5, api.RestartPolicyOnFailure, + nil, 3, 0, 0, + 0, 1, 2, 0, 0, false, + }, + "too many active pods, with controller error": { + 2, 5, api.RestartPolicyOnFailure, + fmt.Errorf("Fake error"), 3, 0, 0, + 0, 0, 3, 0, 0, false, + }, + "failed pod and OnFailure restart policy": { + 2, 5, api.RestartPolicyOnFailure, + nil, 1, 1, 1, + 1, 0, 2, 1, 0, false, + }, + "failed pod and Never restart policy": { + 2, 5, api.RestartPolicyNever, + nil, 1, 1, 1, + 1, 0, 2, 1, 1, false, + }, + "job finish and OnFailure restart policy": { + 2, 5, api.RestartPolicyOnFailure, + nil, 0, 5, 0, + 0, 0, 0, 5, 0, true, + }, + "job finish and Never restart policy": { + 2, 5, api.RestartPolicyNever, + nil, 0, 2, 3, + 0, 0, 0, 2, 3, true, + }, + "more active pods than completions": { + 2, 5, api.RestartPolicyOnFailure, + nil, 10, 0, 0, + 0, 8, 2, 0, 0, false, + }, + "status change": { + 2, 5, api.RestartPolicyOnFailure, + nil, 2, 2, 0, + 0, 0, 2, 2, 0, false, + }, + } + + for name, tc := range testCases { + // job manager setup + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) + manager := NewJobManager(client) + fakePodControl := FakePodControl{err: tc.podControllerError} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + var actual *experimental.Job + manager.updateHandler = func(job *experimental.Job) error { + actual = job + return nil + } + + // job & pods setup + job := newJob(tc.parallelism, tc.completions, tc.restartPolicy) + manager.jobStore.Store.Add(job) + for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { + manager.podStore.Store.Add(&pod) + } + for _, pod := range newPodList(tc.successfulPods, api.PodSucceeded, job) { + manager.podStore.Store.Add(&pod) + } + for _, pod := range newPodList(tc.unsuccessfulPods, api.PodFailed, job) { + manager.podStore.Store.Add(&pod) + } + + // run + err := manager.syncJob(getKey(job, t)) + if err != nil { + t.Errorf("%s: unexpected error when syncing jobs %v", err) + } + + // validate created/deleted pods + if len(fakePodControl.podSpec) != tc.expectedCreations { + t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.podSpec)) + } + if len(fakePodControl.deletePodName) != tc.expectedDeletions { + t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.deletePodName)) + } + // validate status + if actual.Status.Active != tc.expectedActive { + t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active) + } + if actual.Status.Successful != tc.expectedSuccessful { + t.Errorf("%s: unexpected number of successful pods. Expected %d, saw %d\n", name, tc.expectedSuccessful, actual.Status.Successful) + } + if actual.Status.Unsuccessful != tc.expectedUnsuccessful { + t.Errorf("%s: unexpected number of unsuccessful pods. Expected %d, saw %d\n", name, tc.expectedUnsuccessful, actual.Status.Unsuccessful) + } + // validate conditions + if tc.expectedComplete { + completed := false + for _, v := range actual.Status.Conditions { + if v.Type == experimental.JobComplete && v.Status == api.ConditionTrue { + completed = true + break + } + } + if !completed { + t.Errorf("%s: expected completion condition. Got %v", name, actual.Status.Conditions) + } + } + } +} + +func TestSyncJobDeleted(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) + manager := NewJobManager(client) + fakePodControl := FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.updateHandler = func(job *experimental.Job) error { return nil } + job := newJob(2, 2, api.RestartPolicyOnFailure) + err := manager.syncJob(getKey(job, t)) + if err != nil { + t.Errorf("Unexpected error when syncing jobs %v", err) + } + if len(fakePodControl.podSpec) != 0 { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.podSpec)) + } + if len(fakePodControl.deletePodName) != 0 { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.deletePodName)) + } +} + +func TestSyncJobUpdateRequeue(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) + manager := NewJobManager(client) + fakePodControl := FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") } + job := newJob(2, 2, api.RestartPolicyOnFailure) + manager.jobStore.Store.Add(job) + err := manager.syncJob(getKey(job, t)) + if err != nil { + t.Errorf("Unxpected error when syncing jobs, got %v", err) + } + ch := make(chan interface{}) + go func() { + item, _ := manager.queue.Get() + ch <- item + }() + select { + case key := <-ch: + expectedKey := getKey(job, t) + if key != expectedKey { + t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key) + } + case <-time.After(controllerTimeout): + manager.queue.ShutDown() + t.Errorf("Expected to find a job in the queue, found none.") + } +} + +func TestJobPodLookup(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) + manager := NewJobManager(client) + manager.podStoreSynced = alwaysReady + testCases := []struct { + job *experimental.Job + pod *api.Pod + + expectedName string + }{ + // pods without labels don't match any job + { + job: &experimental.Job{ + ObjectMeta: api.ObjectMeta{Name: "basic"}, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll}, + }, + expectedName: "", + }, + // matching labels, different namespace + { + job: &experimental.Job{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: experimental.JobSpec{ + Selector: map[string]string{"foo": "bar"}, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo2", + Namespace: "ns", + Labels: map[string]string{"foo": "bar"}, + }, + }, + expectedName: "", + }, + // matching ns and labels returns + { + job: &experimental.Job{ + ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"}, + Spec: experimental.JobSpec{ + Selector: map[string]string{"foo": "bar"}, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo3", + Namespace: "ns", + Labels: map[string]string{"foo": "bar"}, + }, + }, + expectedName: "bar", + }, + } + for _, tc := range testCases { + manager.jobStore.Add(tc.job) + if job := manager.getPodJob(tc.pod); job != nil { + if tc.expectedName != job.Name { + t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) + } + } else if tc.expectedName != "" { + t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name) + } + } +} + +type FakeJobExpectations struct { + *controller.ControllerExpectations + satisfied bool + expSatisfied func() +} + +func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { + fe.expSatisfied() + return fe.satisfied +} + +// TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods +// and checking expectations. +func TestSyncJobExpectations(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) + manager := NewJobManager(client) + fakePodControl := FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.updateHandler = func(job *experimental.Job) error { return nil } + + job := newJob(2, 2, api.RestartPolicyOnFailure) + manager.jobStore.Store.Add(job) + pods := newPodList(2, api.PodPending, job) + manager.podStore.Store.Add(&pods[0]) + + manager.expectations = FakeJobExpectations{ + controller.NewControllerExpectations(), true, func() { + // If we check active pods before checking expectataions, the job + // will create a new replica because it doesn't see this pod, but + // has fulfilled its expectations. + manager.podStore.Store.Add(&pods[1]) + }, + } + manager.syncJob(getKey(job, t)) + if len(fakePodControl.podSpec) != 0 { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.podSpec)) + } + if len(fakePodControl.deletePodName) != 0 { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.deletePodName)) + } +} + +type FakeWatcher struct { + w *watch.FakeWatcher + *testclient.Fake +} + +func TestWatchJobs(t *testing.T) { + fakeWatch := watch.NewFake() + client := &testclient.Fake{} + client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) + manager := NewJobManager(client) + manager.podStoreSynced = alwaysReady + + var testJob experimental.Job + received := make(chan string) + + // The update sent through the fakeWatcher should make its way into the workqueue, + // and eventually into the syncHandler. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.jobStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find job under key %v", key) + } + job := *obj.(*experimental.Job) + if !api.Semantic.DeepDerivative(job, testJob) { + t.Errorf("Expected %#v, but got %#v", testJob, job) + } + received <- key + return nil + } + // Start only the job watcher and the workqueue, send a watch event, + // and make sure it hits the sync method. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.jobController.Run(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + // We're sending new job to see if it reaches syncHandler. + testJob.Name = "foo" + fakeWatch.Add(&testJob) + select { + case <-received: + case <-time.After(controllerTimeout): + t.Errorf("Expected 1 call but got 0") + } + + // We're sending fake finished job, to see if it reaches syncHandler - it should not, + // since we're filtering out finished jobs. + testJobv2 := experimental.Job{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: experimental.JobStatus{ + Conditions: []experimental.JobCondition{{ + Type: experimental.JobComplete, + Status: api.ConditionTrue, + LastProbeTime: util.Now(), + LastTransitionTime: util.Now(), + }}, + }, + } + fakeWatch.Modify(&testJobv2) + + select { + case <-received: + t.Errorf("Expected 0 call but got 1") + case <-time.After(controllerTimeout): + } +} + +func TestWatchPods(t *testing.T) { + fakeWatch := watch.NewFake() + client := &testclient.Fake{} + client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) + manager := NewJobManager(client) + manager.podStoreSynced = alwaysReady + + // Put one job and one pod into the store + testJob := newJob(2, 2, api.RestartPolicyOnFailure) + manager.jobStore.Store.Add(testJob) + received := make(chan string) + // The pod update sent through the fakeWatcher should figure out the managing job and + // send it into the syncHandler. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.jobStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find job under key %v", key) + } + job := obj.(*experimental.Job) + if !api.Semantic.DeepDerivative(job, testJob) { + t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) + } + close(received) + return nil + } + // Start only the pod watcher and the workqueue, send a watch event, + // and make sure it hits the sync method for the right job. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.podController.Run(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + pods := newPodList(1, api.PodRunning, testJob) + testPod := pods[0] + testPod.Status.Phase = api.PodFailed + fakeWatch.Add(&testPod) + + select { + case <-received: + case <-time.After(controllerTimeout): + t.Errorf("Expected 1 call but got 0") + } +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 082abe963e0..d9398a45fe3 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -350,7 +350,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re for i := 0; i < diff; i++ { go func() { defer wait.Done() - if err := rm.podControl.CreateReplica(rc.Namespace, rc); err != nil { + if err := rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc); err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 404099a0271..089f838190a 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -27,7 +27,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" - "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" @@ -42,7 +41,7 @@ import ( ) type FakePodControl struct { - controllerSpec []api.ReplicationController + controllerSpec []api.PodTemplateSpec deletePodName []string lock sync.Mutex err error @@ -60,7 +59,7 @@ func init() { api.ForTesting_ReferencesAllowBlankSelfLinks = true } -func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error { +func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() if f.err != nil { @@ -70,7 +69,7 @@ func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationCo return nil } -func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *experimental.DaemonSet, nodeName string) error { +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error { return nil } @@ -87,7 +86,7 @@ func (f *FakePodControl) clear() { f.lock.Lock() defer f.lock.Unlock() f.deletePodName = []string{} - f.controllerSpec = []api.ReplicationController{} + f.controllerSpec = []api.PodTemplateSpec{} } func getKey(rc *api.ReplicationController, t *testing.T) string {