diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 86b46f29ade..a45fa1d990e 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -406,7 +406,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(sharedInformers.Pods().Informer(), client("job-controller")). + go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). Run(int(s.ConcurrentJobSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/pkg/client/cache/BUILD b/pkg/client/cache/BUILD index a706defb161..eba219a415d 100644 --- a/pkg/client/cache/BUILD +++ b/pkg/client/cache/BUILD @@ -40,7 +40,6 @@ go_library( "//pkg/api/meta:go_default_library", "//pkg/api/unversioned:go_default_library", "//pkg/apis/apps:go_default_library", - "//pkg/apis/batch:go_default_library", "//pkg/apis/certificates:go_default_library", "//pkg/apis/extensions:go_default_library", "//pkg/apis/policy:go_default_library", @@ -84,7 +83,6 @@ go_test( "//pkg/api/testapi:go_default_library", "//pkg/api/unversioned:go_default_library", "//pkg/apimachinery/registered:go_default_library", - "//pkg/apis/batch:go_default_library", "//pkg/apis/extensions:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/restclient:go_default_library", diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 3b818d7e024..bc65876e036 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -25,7 +25,6 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/apps" - "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/certificates" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/policy" @@ -294,56 +293,6 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E return } -// StoreToJobLister gives a store List and Exists methods. The store must contain only Jobs. -type StoreToJobLister struct { - Store -} - -// Exists checks if the given job exists in the store. -func (s *StoreToJobLister) Exists(job *batch.Job) (bool, error) { - _, exists, err := s.Store.Get(job) - if err != nil { - return false, err - } - return exists, nil -} - -// StoreToJobLister lists all jobs in the store. -func (s *StoreToJobLister) List() (jobs batch.JobList, err error) { - for _, c := range s.Store.List() { - jobs.Items = append(jobs.Items, *(c.(*batch.Job))) - } - return jobs, nil -} - -// GetPodJobs returns a list of jobs managing a pod. Returns an error only if no matching jobs are found. -func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) { - var selector labels.Selector - var job batch.Job - - if len(pod.Labels) == 0 { - err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) - return - } - - for _, m := range s.Store.List() { - job = *m.(*batch.Job) - if job.Namespace != pod.Namespace { - continue - } - - selector, _ = unversioned.LabelSelectorAsSelector(job.Spec.Selector) - if !selector.Matches(labels.Set(pod.Labels)) { - continue - } - jobs = append(jobs, job) - } - if len(jobs) == 0 { - err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - return -} - // Typed wrapper around a store of PersistentVolumes type StoreToPVFetcher struct { Store diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 597362eb800..6b7e5937478 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -22,7 +22,6 @@ import ( "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/sets" @@ -517,177 +516,6 @@ func TestStoreToDaemonSetLister(t *testing.T) { } } -func TestStoreToJobLister(t *testing.T) { - store := NewStore(MetaNamespaceKeyFunc) - lister := StoreToJobLister{store} - testCases := []struct { - inJobs []*batch.Job - list func() ([]batch.Job, error) - outJobNames sets.String - expectErr bool - msg string - }{ - // Basic listing - { - inJobs: []*batch.Job{ - {ObjectMeta: api.ObjectMeta{Name: "basic"}}, - }, - list: func() ([]batch.Job, error) { - list, err := lister.List() - return list.Items, err - }, - outJobNames: sets.NewString("basic"), - msg: "basic listing failed", - }, - // Listing multiple jobs - { - inJobs: []*batch.Job{ - {ObjectMeta: api.ObjectMeta{Name: "basic"}}, - {ObjectMeta: api.ObjectMeta{Name: "complex"}}, - {ObjectMeta: api.ObjectMeta{Name: "complex2"}}, - }, - list: func() ([]batch.Job, error) { - list, err := lister.List() - return list.Items, err - }, - outJobNames: sets.NewString("basic", "complex", "complex2"), - msg: "listing multiple jobs failed", - }, - // No pod labels - { - inJobs: []*batch.Job{ - { - ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, - Spec: batch.JobSpec{ - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{"foo": "baz"}, - }, - }, - }, - }, - list: func() ([]batch.Job, error) { - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "pod", Namespace: "ns"}, - } - return lister.GetPodJobs(pod) - }, - outJobNames: sets.NewString(), - expectErr: true, - msg: "listing jobs failed when pod has no labels: expected error, got none", - }, - // No Job selectors - { - inJobs: []*batch.Job{ - { - ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, - }, - }, - list: func() ([]batch.Job, error) { - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "pod", - Namespace: "ns", - Labels: map[string]string{"foo": "bar"}, - }, - } - return lister.GetPodJobs(pod) - }, - outJobNames: sets.NewString(), - expectErr: true, - msg: "listing jobs failed when job has no selector: expected error, got none", - }, - // Matching labels to selectors and namespace - { - inJobs: []*batch.Job{ - { - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: batch.JobSpec{ - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{"foo": "bar"}, - }, - }, - }, - { - ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"}, - Spec: batch.JobSpec{ - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{"foo": "bar"}, - }, - }, - }, - }, - list: func() ([]batch.Job, error) { - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "pod", - Labels: map[string]string{"foo": "bar"}, - Namespace: "ns", - }, - } - return lister.GetPodJobs(pod) - }, - outJobNames: sets.NewString("bar"), - msg: "listing jobs with namespace and selector failed", - }, - // Matching labels to selectors and namespace, error case - { - inJobs: []*batch.Job{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "foo"}, - Spec: batch.JobSpec{ - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{"foo": "bar"}, - }, - }, - }, - { - ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "bar"}, - Spec: batch.JobSpec{ - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{"foo": "bar"}, - }, - }, - }, - }, - list: func() ([]batch.Job, error) { - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "pod", - Labels: map[string]string{"foo": "bar"}, - Namespace: "baz", - }, - } - return lister.GetPodJobs(pod) - }, - expectErr: true, - msg: "listing jobs with namespace and selector failed: expected error, got none", - }, - } - for _, c := range testCases { - for _, r := range c.inJobs { - store.Add(r) - } - - Jobs, err := c.list() - if err != nil && c.expectErr { - continue - } else if c.expectErr { - t.Errorf("%v", c.msg) - continue - } else if err != nil { - t.Errorf("Unexpected error %#v", err) - continue - } - JobNames := make([]string, len(Jobs)) - for ix := range Jobs { - JobNames[ix] = Jobs[ix].Name - } - if !c.outJobNames.HasAll(JobNames...) || len(JobNames) != len(c.outJobNames) { - t.Errorf("%v : expected %v, got %v", c.msg, JobNames, c.outJobNames) - } - } -} - func TestStoreToPodLister(t *testing.T) { // We test with and without a namespace index, because StoreToPodLister has // special logic to work on namespaces even when no namespace index is diff --git a/pkg/client/listers/batch/internalversion/BUILD b/pkg/client/listers/batch/internalversion/BUILD index dc44e1136ac..3a4eacba3d1 100644 --- a/pkg/client/listers/batch/internalversion/BUILD +++ b/pkg/client/listers/batch/internalversion/BUILD @@ -15,13 +15,31 @@ go_library( srcs = [ "expansion_generated.go", "job.go", + "job_expansion.go", "scheduledjob.go", ], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", "//pkg/api/errors:go_default_library", + "//pkg/api/unversioned:go_default_library", "//pkg/apis/batch:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/labels:go_default_library", ], ) + +go_test( + name = "go_default_test", + srcs = ["job_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/unversioned:go_default_library", + "//pkg/apis/batch:go_default_library", + "//pkg/client/cache:go_default_library", + "//pkg/labels:go_default_library", + "//pkg/util/sets:go_default_library", + ], +) diff --git a/pkg/client/listers/batch/internalversion/expansion_generated.go b/pkg/client/listers/batch/internalversion/expansion_generated.go index 94e8290c45c..de9fc938b73 100644 --- a/pkg/client/listers/batch/internalversion/expansion_generated.go +++ b/pkg/client/listers/batch/internalversion/expansion_generated.go @@ -18,14 +18,6 @@ limitations under the License. package internalversion -// JobListerExpansion allows custom methods to be added to -// JobLister. -type JobListerExpansion interface{} - -// JobNamespaceListerExpansion allows custom methods to be added to -// JobNamespaeLister. -type JobNamespaceListerExpansion interface{} - // ScheduledJobListerExpansion allows custom methods to be added to // ScheduledJobLister. type ScheduledJobListerExpansion interface{} diff --git a/pkg/client/listers/batch/internalversion/job_expansion.go b/pkg/client/listers/batch/internalversion/job_expansion.go new file mode 100644 index 00000000000..e73ce519219 --- /dev/null +++ b/pkg/client/listers/batch/internalversion/job_expansion.go @@ -0,0 +1,64 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internalversion + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/labels" +) + +// JobListerExpansion allows custom methods to be added to +// JobLister. +type JobListerExpansion interface { + // GetPodJobs returns a list of jobs managing a pod. An error is returned only + // if no matching jobs are found. + GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) +} + +// GetPodJobs returns a list of jobs managing a pod. An error is returned only +// if no matching jobs are found. +func (l *jobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) { + if len(pod.Labels) == 0 { + err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) + return + } + + var list []*batch.Job + list, err = l.Jobs(pod.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, job := range list { + selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector) + if !selector.Matches(labels.Set(pod.Labels)) { + continue + } + jobs = append(jobs, *job) + } + if len(jobs) == 0 { + err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// JobNamespaceListerExpansion allows custom methods to be added to +// JobNamespaceLister. +type JobNamespaceListerExpansion interface{} diff --git a/pkg/client/listers/batch/internalversion/job_test.go b/pkg/client/listers/batch/internalversion/job_test.go new file mode 100644 index 00000000000..f0d99b3184b --- /dev/null +++ b/pkg/client/listers/batch/internalversion/job_test.go @@ -0,0 +1,219 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internalversion + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/sets" +) + +func TestJobLister(t *testing.T) { + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + lister := NewJobLister(indexer) + testCases := []struct { + inJobs []*batch.Job + list func() ([]*batch.Job, error) + outJobNames sets.String + expectErr bool + msg string + }{ + // Basic listing + { + inJobs: []*batch.Job{ + {ObjectMeta: api.ObjectMeta{Name: "basic"}}, + }, + list: func() ([]*batch.Job, error) { + list, err := lister.List(labels.Everything()) + return list, err + }, + outJobNames: sets.NewString("basic"), + msg: "basic listing failed", + }, + // Listing multiple jobs + { + inJobs: []*batch.Job{ + {ObjectMeta: api.ObjectMeta{Name: "basic"}}, + {ObjectMeta: api.ObjectMeta{Name: "complex"}}, + {ObjectMeta: api.ObjectMeta{Name: "complex2"}}, + }, + list: func() ([]*batch.Job, error) { + list, err := lister.List(labels.Everything()) + return list, err + }, + outJobNames: sets.NewString("basic", "complex", "complex2"), + msg: "listing multiple jobs failed", + }, + // No pod labels + { + inJobs: []*batch.Job{ + { + ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, + Spec: batch.JobSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{"foo": "baz"}, + }, + }, + }, + }, + list: func() ([]*batch.Job, error) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "pod", Namespace: "ns"}, + } + podJobs, err := lister.GetPodJobs(pod) + jobs := make([]*batch.Job, 0, len(podJobs)) + for i := range podJobs { + jobs = append(jobs, &podJobs[i]) + } + return jobs, err + }, + outJobNames: sets.NewString(), + expectErr: true, + msg: "listing jobs failed when pod has no labels: expected error, got none", + }, + // No Job selectors + { + inJobs: []*batch.Job{ + { + ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, + }, + }, + list: func() ([]*batch.Job, error) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: map[string]string{"foo": "bar"}, + }, + } + podJobs, err := lister.GetPodJobs(pod) + jobs := make([]*batch.Job, 0, len(podJobs)) + for i := range podJobs { + jobs = append(jobs, &podJobs[i]) + } + return jobs, err + }, + outJobNames: sets.NewString(), + expectErr: true, + msg: "listing jobs failed when job has no selector: expected error, got none", + }, + // Matching labels to selectors and namespace + { + inJobs: []*batch.Job{ + { + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: batch.JobSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"}, + Spec: batch.JobSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + }, + }, + }, + list: func() ([]*batch.Job, error) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod", + Labels: map[string]string{"foo": "bar"}, + Namespace: "ns", + }, + } + podJobs, err := lister.GetPodJobs(pod) + jobs := make([]*batch.Job, 0, len(podJobs)) + for i := range podJobs { + jobs = append(jobs, &podJobs[i]) + } + return jobs, err + }, + outJobNames: sets.NewString("bar"), + msg: "listing jobs with namespace and selector failed", + }, + // Matching labels to selectors and namespace, error case + { + inJobs: []*batch.Job{ + { + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "foo"}, + Spec: batch.JobSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "bar"}, + Spec: batch.JobSpec{ + Selector: &unversioned.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + }, + }, + }, + list: func() ([]*batch.Job, error) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod", + Labels: map[string]string{"foo": "bar"}, + Namespace: "baz", + }, + } + podJobs, err := lister.GetPodJobs(pod) + jobs := make([]*batch.Job, 0, len(podJobs)) + for i := range podJobs { + jobs = append(jobs, &podJobs[i]) + } + return jobs, err + }, + expectErr: true, + msg: "listing jobs with namespace and selector failed: expected error, got none", + }, + } + for _, c := range testCases { + for _, r := range c.inJobs { + indexer.Add(r) + } + + Jobs, err := c.list() + if err != nil && c.expectErr { + continue + } else if c.expectErr { + t.Errorf("%v", c.msg) + continue + } else if err != nil { + t.Errorf("Unexpected error %#v", err) + continue + } + JobNames := make([]string, len(Jobs)) + for ix := range Jobs { + JobNames[ix] = Jobs[ix].Name + } + if !c.outJobNames.HasAll(JobNames...) || len(JobNames) != len(c.outJobNames) { + t.Errorf("%v : expected %v, got %v", c.msg, JobNames, c.outJobNames) + } + } +} diff --git a/pkg/controller/informers/BUILD b/pkg/controller/informers/BUILD index 8288c7a7318..2b48aea4204 100644 --- a/pkg/controller/informers/BUILD +++ b/pkg/controller/informers/BUILD @@ -13,6 +13,7 @@ load( go_library( name = "go_default_library", srcs = [ + "batch.go", "core.go", "extensions.go", "factory.go", @@ -24,11 +25,13 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/api/unversioned:go_default_library", + "//pkg/apis/batch:go_default_library", "//pkg/apis/extensions:go_default_library", "//pkg/apis/rbac:go_default_library", "//pkg/apis/storage:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/listers/batch/internalversion:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/runtime:go_default_library", "//pkg/watch:go_default_library", diff --git a/pkg/controller/informers/batch.go b/pkg/controller/informers/batch.go new file mode 100644 index 00000000000..3143d555e70 --- /dev/null +++ b/pkg/controller/informers/batch.go @@ -0,0 +1,83 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "reflect" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + batchinternallisters "k8s.io/kubernetes/pkg/client/listers/batch/internalversion" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// JobInformer is type of SharedIndexInformer which watches and lists all jobs. +// Interface provides constructor for informer and lister for jobs +type JobInformer interface { + Informer() cache.SharedIndexInformer + Lister() batchinternallisters.JobLister +} + +type jobInformer struct { + *sharedInformerFactory +} + +// Informer checks whether jobInformer exists in sharedInformerFactory and if not, it creates new informer of type +// jobInformer and connects it to sharedInformerFactory +func (f *jobInformer) Informer() cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(&batch.Job{}) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = NewJobInformer(f.client, f.defaultResync) + f.informers[informerType] = informer + + return informer +} + +// NewJobInformer returns a SharedIndexInformer that lists and watches all jobs +func NewJobInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Batch().Jobs(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Batch().Jobs(api.NamespaceAll).Watch(options) + }, + }, + &batch.Job{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + return sharedIndexInformer +} + +// Lister returns lister for jobInformer +func (f *jobInformer) Lister() batchinternallisters.JobLister { + informer := f.Informer() + return batchinternallisters.NewJobLister(informer.GetIndexer()) +} diff --git a/pkg/controller/informers/factory.go b/pkg/controller/informers/factory.go index daf4e03ee4d..084e2e21b6a 100644 --- a/pkg/controller/informers/factory.go +++ b/pkg/controller/informers/factory.go @@ -54,6 +54,8 @@ type SharedInformerFactory interface { Roles() RoleInformer StorageClasses() StorageClassInformer + + Jobs() JobInformer } type sharedInformerFactory struct { @@ -158,3 +160,8 @@ func (f *sharedInformerFactory) LimitRanges() LimitRangeInformer { func (f *sharedInformerFactory) StorageClasses() StorageClassInformer { return &storageClassInformer{sharedInformerFactory: f} } + +// Jobs returns a SharedIndexInformer that lists and watches all storage jobs +func (f *sharedInformerFactory) Jobs() JobInformer { + return &jobInformer{sharedInformerFactory: f} +} diff --git a/pkg/controller/informers/generic.go b/pkg/controller/informers/generic.go index fa34fb48f61..fc6a6e0052b 100644 --- a/pkg/controller/informers/generic.go +++ b/pkg/controller/informers/generic.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/rbac" "k8s.io/kubernetes/pkg/client/cache" @@ -67,6 +68,9 @@ func (f *sharedInformerFactory) ForResource(resource unversioned.GroupResource) return &genericInformer{resource: resource, informer: f.RoleBindings().Informer()}, nil case rbac.Resource("roles"): return &genericInformer{resource: resource, informer: f.Roles().Informer()}, nil + + case batch.Resource("jobs"): + return &genericInformer{resource: resource, informer: f.Jobs().Informer()}, nil } return nil, fmt.Errorf("no informer found for %v", resource) diff --git a/pkg/controller/job/BUILD b/pkg/controller/job/BUILD index b7ada0fa7a7..20782b81d2e 100644 --- a/pkg/controller/job/BUILD +++ b/pkg/controller/job/BUILD @@ -20,21 +20,20 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/api/errors:go_default_library", "//pkg/api/unversioned:go_default_library", "//pkg/apis/batch:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", + "//pkg/client/listers/batch/internalversion:go_default_library", "//pkg/client/record:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/informers:go_default_library", - "//pkg/controller/replication:go_default_library", - "//pkg/runtime:go_default_library", "//pkg/util/metrics:go_default_library", "//pkg/util/runtime:go_default_library", "//pkg/util/wait:go_default_library", "//pkg/util/workqueue:go_default_library", - "//pkg/watch:go_default_library", "//vendor:github.com/golang/glog", ], ) @@ -52,11 +51,13 @@ go_test( "//pkg/api/unversioned:go_default_library", "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/batch:go_default_library", + "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/client/restclient:go_default_library", "//pkg/client/testing/core:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/informers:go_default_library", "//pkg/util/rand:go_default_library", "//pkg/util/wait:go_default_library", "//pkg/watch:go_default_library", diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 0d6f746767f..049b0969733 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -24,21 +24,20 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + batchinternallisters "k8s.io/kubernetes/pkg/client/listers/batch/internalversion" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/informers" - replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" - "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -47,27 +46,21 @@ type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface - // internalPodInformer is used to hold a personal informer. If we're using - // a normal shared informer, then the informer will be started for us. If - // we have a personal informer, we must start it ourselves. If you start - // the controller using NewJobController(passing SharedInformer), this - // will be null - internalPodInformer cache.SharedInformer - // To allow injection of updateJobStatus for testing. updateHandler func(job *batch.Job) error syncHandler func(jobKey string) error // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. - podStoreSynced func() bool + podStoreSynced cache.InformerSynced + // jobStoreSynced returns true if the job store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + jobStoreSynced cache.InformerSynced // A TTLCache of pod creates/deletes each rc expects to see expectations controller.ControllerExpectationsInterface - // A store of job, populated by the jobController - jobStore cache.StoreToJobLister - // Watches changes to all jobs - jobController *cache.Controller + // A store of jobs + jobLister batchinternallisters.JobLister // A store of pods, populated by the podController podStore cache.StoreToPodLister @@ -78,7 +71,7 @@ type JobController struct { recorder record.EventRecorder } -func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface) *JobController { +func NewJobController(podInformer cache.SharedIndexInformer, jobInformer informers.JobInformer, kubeClient clientset.Interface) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -99,28 +92,17 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}), } - jm.jobStore.Store, jm.jobController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options) - }, + jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: jm.enqueueController, + UpdateFunc: func(old, cur interface{}) { + if job := cur.(*batch.Job); !IsJobFinished(job) { + jm.enqueueController(job) + } }, - &batch.Job{}, - // TODO: Can we have much longer period here? - replicationcontroller.FullControllerResyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: jm.enqueueController, - UpdateFunc: func(old, cur interface{}) { - if job := cur.(*batch.Job); !IsJobFinished(job) { - jm.enqueueController(job) - } - }, - DeleteFunc: jm.enqueueController, - }, - ) + DeleteFunc: jm.enqueueController, + }) + jm.jobLister = jobInformer.Lister() + jm.jobStoreSynced = jobInformer.Informer().HasSynced podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, @@ -135,39 +117,26 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse return jm } -func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { - podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) - jm := NewJobController(podInformer, kubeClient) - jm.internalPodInformer = podInformer - - return jm -} - // Run the main goroutine responsible for watching and syncing jobs. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer jm.queue.ShutDown() - if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced) { + if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced, jm.jobStoreSynced) { return } - go jm.jobController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(jm.worker, time.Second, stopCh) } - if jm.internalPodInformer != nil { - go jm.internalPodInformer.Run(stopCh) - } - <-stopCh glog.Infof("Shutting down Job Manager") } // getPodJob returns the job managing the given pod. func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job { - jobs, err := jm.jobStore.GetPodJobs(pod) + jobs, err := jm.jobLister.GetPodJobs(pod) if err != nil { glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) return nil @@ -315,16 +284,23 @@ func (jm *JobController) syncJob(key string) error { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := jm.jobStore.Store.GetByKey(key) - if !exists { - glog.V(4).Infof("Job has been deleted: %v", key) - jm.expectations.DeleteExpectations(key) - return nil - } + ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } - job := *obj.(*batch.Job) + if len(ns) == 0 || len(name) == 0 { + return fmt.Errorf("invalid job key %q: either namespace or name is missing", key) + } + sharedJob, err := jm.jobLister.Jobs(ns).Get(name) + if err != nil { + if errors.IsNotFound(err) { + glog.V(4).Infof("Job has been deleted: %v", key) + jm.expectations.DeleteExpectations(key) + return nil + } + return err + } + job := *sharedJob // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 1229ecd205c..2dcd6687275 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -18,7 +18,6 @@ package job import ( "fmt" - "reflect" "testing" "time" @@ -26,11 +25,13 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/util/rand" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" @@ -86,6 +87,13 @@ func getKey(job *batch.Job, t *testing.T) string { } } +func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) { + sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) + jm := NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), kubeClient) + + return jm, sharedInformers +} + // create count pods with the given phase for the given job func newPodList(count int32, status api.PodPhase, job *batch.Job) []api.Pod { pods := []api.Pod{} @@ -220,10 +228,11 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControllerError} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady var actual *batch.Job manager.updateHandler = func(job *batch.Job) error { actual = job @@ -236,18 +245,19 @@ func TestControllerSyncJob(t *testing.T) { now := unversioned.Now() job.DeletionTimestamp = &now } - manager.jobStore.Store.Add(job) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job) + podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer() for _, pod := range newPodList(tc.pendingPods, api.PodPending, job) { - manager.podStore.Indexer.Add(&pod) + podIndexer.Add(&pod) } for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { - manager.podStore.Indexer.Add(&pod) + podIndexer.Add(&pod) } for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) { - manager.podStore.Indexer.Add(&pod) + podIndexer.Add(&pod) } for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) { - manager.podStore.Indexer.Add(&pod) + podIndexer.Add(&pod) } // run @@ -322,10 +332,11 @@ func TestSyncJobPastDeadline(t *testing.T) { for name, tc := range testCases { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady var actual *batch.Job manager.updateHandler = func(job *batch.Job) error { actual = job @@ -337,15 +348,16 @@ func TestSyncJobPastDeadline(t *testing.T) { job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds start := unversioned.Unix(unversioned.Now().Time.Unix()-tc.startTime, 0) job.Status.StartTime = &start - manager.jobStore.Store.Add(job) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job) + podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer() for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { - manager.podStore.Indexer.Add(&pod) + podIndexer.Add(&pod) } for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) { - manager.podStore.Indexer.Add(&pod) + podIndexer.Add(&pod) } for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) { - manager.podStore.Indexer.Add(&pod) + podIndexer.Add(&pod) } // run @@ -392,10 +404,11 @@ func getCondition(job *batch.Job, condition batch.JobConditionType) bool { func TestSyncPastDeadlineJobFinished(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady var actual *batch.Job manager.updateHandler = func(job *batch.Job) error { actual = job @@ -408,7 +421,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { start := unversioned.Unix(unversioned.Now().Time.Unix()-15, 0) job.Status.StartTime = &start job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline")) - manager.jobStore.Store.Add(job) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) @@ -426,23 +439,23 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { func TestSyncJobComplete(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady job := newJob(1, 1) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) - manager.jobStore.Store.Add(job) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(getKey(job, t)) if err != nil { t.Fatalf("Unexpected error when syncing jobs %v", err) } - uncastJob, _, err := manager.jobStore.Store.Get(job) + actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { t.Fatalf("Unexpected error when trying to get job from the store: %v", err) } - actual := uncastJob.(*batch.Job) // Verify that after syncing a complete job, the conditions are the same. if got, expected := len(actual.Status.Conditions), 1; got != expected { t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got) @@ -451,10 +464,11 @@ func TestSyncJobComplete(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, _ := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } job := newJob(2, 2) err := manager.syncJob(getKey(job, t)) @@ -471,17 +485,18 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady updateError := fmt.Errorf("Update error") manager.updateHandler = func(job *batch.Job) error { manager.queue.AddRateLimited(getKey(job, t)) return updateError } job := newJob(2, 2) - manager.jobStore.Store.Add(job) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(getKey(job, t)) if err == nil || err != updateError { t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err) @@ -496,8 +511,9 @@ func TestSyncJobUpdateRequeue(t *testing.T) { func TestJobPodLookup(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady testCases := []struct { job *batch.Job pod *api.Pod @@ -560,7 +576,7 @@ func TestJobPodLookup(t *testing.T) { }, } for _, tc := range testCases { - manager.jobStore.Add(tc.job) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(tc.job) if job := manager.getPodJob(tc.pod); job != nil { if tc.expectedName != job.Name { t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) @@ -586,23 +602,25 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // and checking expectations. func TestSyncJobExpectations(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } job := newJob(2, 2) - manager.jobStore.Store.Add(job) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job) pods := newPodList(2, api.PodPending, job) - manager.podStore.Indexer.Add(&pods[0]) + podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer() + podIndexer.Add(&pods[0]) manager.expectations = FakeJobExpectations{ controller.NewControllerExpectations(), true, func() { // If we check active pods before checking expectataions, the job // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. - manager.podStore.Indexer.Add(&pods[1]) + podIndexer.Add(&pods[1]) }, } manager.syncJob(getKey(job, t)) @@ -618,8 +636,9 @@ func TestWatchJobs(t *testing.T) { clientset := fake.NewSimpleClientset() fakeWatch := watch.NewFake() clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady var testJob batch.Job received := make(chan struct{}) @@ -627,28 +646,30 @@ func TestWatchJobs(t *testing.T) { // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. manager.syncHandler = func(key string) error { - - obj, exists, err := manager.jobStore.Store.GetByKey(key) - if !exists || err != nil { - t.Errorf("Expected to find job under key %v", key) + defer close(received) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + t.Errorf("Error getting namespace/name from key %v: %v", key, err) } - job, ok := obj.(*batch.Job) - if !ok { - t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) + job, err := manager.jobLister.Jobs(ns).Get(name) + if err != nil || job == nil { + t.Errorf("Expected to find job under key %v: %v", key, err) + return nil } if !api.Semantic.DeepDerivative(*job, testJob) { t.Errorf("Expected %#v, but got %#v", testJob, *job) } - close(received) return nil } // Start only the job watcher and the workqueue, send a watch event, // and make sure it hits the sync method. stopCh := make(chan struct{}) defer close(stopCh) + sharedInformerFactory.Start(stopCh) go manager.Run(1, stopCh) // We're sending new job to see if it reaches syncHandler. + testJob.Namespace = "bar" testJob.Name = "foo" fakeWatch.Add(&testJob) t.Log("Waiting for job to reach syncHandler") @@ -660,26 +681,23 @@ func TestWatchPods(t *testing.T) { clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady // Put one job and one pod into the store - manager.jobStore.Store.Add(testJob) + sharedInformerFactory.Jobs().Informer().GetIndexer().Add(testJob) received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. manager.syncHandler = func(key string) error { - obj, exists, err := manager.jobStore.Store.GetByKey(key) - if !exists || err != nil { - t.Errorf("Expected to find job under key %v", key) - close(received) - return nil + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + t.Errorf("Error getting namespace/name from key %v: %v", key, err) } - job, ok := obj.(*batch.Job) - if !ok { - t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) - close(received) - return nil + job, err := manager.jobLister.Jobs(ns).Get(name) + if err != nil { + t.Errorf("Expected to find job under key %v: %v", key, err) } if !api.Semantic.DeepDerivative(job, testJob) { t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) @@ -693,7 +711,7 @@ func TestWatchPods(t *testing.T) { // and make sure it hits the sync method for the right job. stopCh := make(chan struct{}) defer close(stopCh) - go manager.internalPodInformer.Run(stopCh) + go sharedInformerFactory.Pods().Informer().Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(1, api.PodRunning, testJob) diff --git a/test/test_owners.csv b/test/test_owners.csv index 11a376fc0d7..ca69e11988f 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -535,6 +535,7 @@ k8s.io/kubernetes/pkg/auth/handlers,liggitt,0 k8s.io/kubernetes/pkg/client/cache,xiang90,1 k8s.io/kubernetes/pkg/client/chaosclient,deads2k,1 k8s.io/kubernetes/pkg/client/leaderelection,xiang90,1 +k8s.io/kubernetes/pkg/client/listers/batch/internalversion,mqliang,0 k8s.io/kubernetes/pkg/client/record,karlkfi,1 k8s.io/kubernetes/pkg/client/restclient,kargakis,1 k8s.io/kubernetes/pkg/client/retry,caesarxuchao,1