Switch to JobLister

This commit is contained in:
Andy Goldstein 2016-11-01 15:57:49 -04:00
parent 8e10413468
commit 8c923faf74
16 changed files with 506 additions and 345 deletions

View File

@ -406,7 +406,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
if containsResource(resources, "jobs") { if containsResource(resources, "jobs") {
glog.Infof("Starting job controller") glog.Infof("Starting job controller")
go job.NewJobController(sharedInformers.Pods().Informer(), client("job-controller")). go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")).
Run(int(s.ConcurrentJobSyncs), wait.NeverStop) Run(int(s.ConcurrentJobSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }

View File

@ -40,7 +40,6 @@ go_library(
"//pkg/api/meta:go_default_library", "//pkg/api/meta:go_default_library",
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/apis/apps:go_default_library", "//pkg/apis/apps:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/certificates:go_default_library", "//pkg/apis/certificates:go_default_library",
"//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions:go_default_library",
"//pkg/apis/policy:go_default_library", "//pkg/apis/policy:go_default_library",
@ -84,7 +83,6 @@ go_test(
"//pkg/api/testapi:go_default_library", "//pkg/api/testapi:go_default_library",
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/apimachinery/registered:go_default_library", "//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/restclient:go_default_library", "//pkg/client/restclient:go_default_library",

View File

@ -25,7 +25,6 @@ import (
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/certificates" "k8s.io/kubernetes/pkg/apis/certificates"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/policy" "k8s.io/kubernetes/pkg/apis/policy"
@ -294,56 +293,6 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E
return return
} }
// StoreToJobLister gives a store List and Exists methods. The store must contain only Jobs.
type StoreToJobLister struct {
Store
}
// Exists checks if the given job exists in the store.
func (s *StoreToJobLister) Exists(job *batch.Job) (bool, error) {
_, exists, err := s.Store.Get(job)
if err != nil {
return false, err
}
return exists, nil
}
// StoreToJobLister lists all jobs in the store.
func (s *StoreToJobLister) List() (jobs batch.JobList, err error) {
for _, c := range s.Store.List() {
jobs.Items = append(jobs.Items, *(c.(*batch.Job)))
}
return jobs, nil
}
// GetPodJobs returns a list of jobs managing a pod. Returns an error only if no matching jobs are found.
func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) {
var selector labels.Selector
var job batch.Job
if len(pod.Labels) == 0 {
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)
return
}
for _, m := range s.Store.List() {
job = *m.(*batch.Job)
if job.Namespace != pod.Namespace {
continue
}
selector, _ = unversioned.LabelSelectorAsSelector(job.Spec.Selector)
if !selector.Matches(labels.Set(pod.Labels)) {
continue
}
jobs = append(jobs, job)
}
if len(jobs) == 0 {
err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// Typed wrapper around a store of PersistentVolumes // Typed wrapper around a store of PersistentVolumes
type StoreToPVFetcher struct { type StoreToPVFetcher struct {
Store Store

View File

@ -22,7 +22,6 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors" apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -517,177 +516,6 @@ func TestStoreToDaemonSetLister(t *testing.T) {
} }
} }
func TestStoreToJobLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
lister := StoreToJobLister{store}
testCases := []struct {
inJobs []*batch.Job
list func() ([]batch.Job, error)
outJobNames sets.String
expectErr bool
msg string
}{
// Basic listing
{
inJobs: []*batch.Job{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
},
list: func() ([]batch.Job, error) {
list, err := lister.List()
return list.Items, err
},
outJobNames: sets.NewString("basic"),
msg: "basic listing failed",
},
// Listing multiple jobs
{
inJobs: []*batch.Job{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
{ObjectMeta: api.ObjectMeta{Name: "complex"}},
{ObjectMeta: api.ObjectMeta{Name: "complex2"}},
},
list: func() ([]batch.Job, error) {
list, err := lister.List()
return list.Items, err
},
outJobNames: sets.NewString("basic", "complex", "complex2"),
msg: "listing multiple jobs failed",
},
// No pod labels
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "baz"},
},
},
},
},
list: func() ([]batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "pod", Namespace: "ns"},
}
return lister.GetPodJobs(pod)
},
outJobNames: sets.NewString(),
expectErr: true,
msg: "listing jobs failed when pod has no labels: expected error, got none",
},
// No Job selectors
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
},
},
list: func() ([]batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod",
Namespace: "ns",
Labels: map[string]string{"foo": "bar"},
},
}
return lister.GetPodJobs(pod)
},
outJobNames: sets.NewString(),
expectErr: true,
msg: "listing jobs failed when job has no selector: expected error, got none",
},
// Matching labels to selectors and namespace
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
},
list: func() ([]batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod",
Labels: map[string]string{"foo": "bar"},
Namespace: "ns",
},
}
return lister.GetPodJobs(pod)
},
outJobNames: sets.NewString("bar"),
msg: "listing jobs with namespace and selector failed",
},
// Matching labels to selectors and namespace, error case
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "foo"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "bar"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
},
list: func() ([]batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod",
Labels: map[string]string{"foo": "bar"},
Namespace: "baz",
},
}
return lister.GetPodJobs(pod)
},
expectErr: true,
msg: "listing jobs with namespace and selector failed: expected error, got none",
},
}
for _, c := range testCases {
for _, r := range c.inJobs {
store.Add(r)
}
Jobs, err := c.list()
if err != nil && c.expectErr {
continue
} else if c.expectErr {
t.Errorf("%v", c.msg)
continue
} else if err != nil {
t.Errorf("Unexpected error %#v", err)
continue
}
JobNames := make([]string, len(Jobs))
for ix := range Jobs {
JobNames[ix] = Jobs[ix].Name
}
if !c.outJobNames.HasAll(JobNames...) || len(JobNames) != len(c.outJobNames) {
t.Errorf("%v : expected %v, got %v", c.msg, JobNames, c.outJobNames)
}
}
}
func TestStoreToPodLister(t *testing.T) { func TestStoreToPodLister(t *testing.T) {
// We test with and without a namespace index, because StoreToPodLister has // We test with and without a namespace index, because StoreToPodLister has
// special logic to work on namespaces even when no namespace index is // special logic to work on namespaces even when no namespace index is

View File

@ -15,13 +15,31 @@ go_library(
srcs = [ srcs = [
"expansion_generated.go", "expansion_generated.go",
"job.go", "job.go",
"job_expansion.go",
"scheduledjob.go", "scheduledjob.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library", "//pkg/api/errors:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/apis/batch:go_default_library", "//pkg/apis/batch:go_default_library",
"//pkg/client/cache:go_default_library", "//pkg/client/cache:go_default_library",
"//pkg/labels:go_default_library", "//pkg/labels:go_default_library",
], ],
) )
go_test(
name = "go_default_test",
srcs = ["job_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/util/sets:go_default_library",
],
)

View File

@ -18,14 +18,6 @@ limitations under the License.
package internalversion package internalversion
// JobListerExpansion allows custom methods to be added to
// JobLister.
type JobListerExpansion interface{}
// JobNamespaceListerExpansion allows custom methods to be added to
// JobNamespaeLister.
type JobNamespaceListerExpansion interface{}
// ScheduledJobListerExpansion allows custom methods to be added to // ScheduledJobListerExpansion allows custom methods to be added to
// ScheduledJobLister. // ScheduledJobLister.
type ScheduledJobListerExpansion interface{} type ScheduledJobListerExpansion interface{}

View File

@ -0,0 +1,64 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internalversion
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/labels"
)
// JobListerExpansion allows custom methods to be added to
// JobLister.
type JobListerExpansion interface {
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error)
}
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
func (l *jobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)
return
}
var list []*batch.Job
list, err = l.Jobs(pod.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, job := range list {
selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
if !selector.Matches(labels.Set(pod.Labels)) {
continue
}
jobs = append(jobs, *job)
}
if len(jobs) == 0 {
err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// JobNamespaceListerExpansion allows custom methods to be added to
// JobNamespaceLister.
type JobNamespaceListerExpansion interface{}

View File

@ -0,0 +1,219 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internalversion
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)
func TestJobLister(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
lister := NewJobLister(indexer)
testCases := []struct {
inJobs []*batch.Job
list func() ([]*batch.Job, error)
outJobNames sets.String
expectErr bool
msg string
}{
// Basic listing
{
inJobs: []*batch.Job{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
},
list: func() ([]*batch.Job, error) {
list, err := lister.List(labels.Everything())
return list, err
},
outJobNames: sets.NewString("basic"),
msg: "basic listing failed",
},
// Listing multiple jobs
{
inJobs: []*batch.Job{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
{ObjectMeta: api.ObjectMeta{Name: "complex"}},
{ObjectMeta: api.ObjectMeta{Name: "complex2"}},
},
list: func() ([]*batch.Job, error) {
list, err := lister.List(labels.Everything())
return list, err
},
outJobNames: sets.NewString("basic", "complex", "complex2"),
msg: "listing multiple jobs failed",
},
// No pod labels
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "baz"},
},
},
},
},
list: func() ([]*batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "pod", Namespace: "ns"},
}
podJobs, err := lister.GetPodJobs(pod)
jobs := make([]*batch.Job, 0, len(podJobs))
for i := range podJobs {
jobs = append(jobs, &podJobs[i])
}
return jobs, err
},
outJobNames: sets.NewString(),
expectErr: true,
msg: "listing jobs failed when pod has no labels: expected error, got none",
},
// No Job selectors
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
},
},
list: func() ([]*batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod",
Namespace: "ns",
Labels: map[string]string{"foo": "bar"},
},
}
podJobs, err := lister.GetPodJobs(pod)
jobs := make([]*batch.Job, 0, len(podJobs))
for i := range podJobs {
jobs = append(jobs, &podJobs[i])
}
return jobs, err
},
outJobNames: sets.NewString(),
expectErr: true,
msg: "listing jobs failed when job has no selector: expected error, got none",
},
// Matching labels to selectors and namespace
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
},
list: func() ([]*batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod",
Labels: map[string]string{"foo": "bar"},
Namespace: "ns",
},
}
podJobs, err := lister.GetPodJobs(pod)
jobs := make([]*batch.Job, 0, len(podJobs))
for i := range podJobs {
jobs = append(jobs, &podJobs[i])
}
return jobs, err
},
outJobNames: sets.NewString("bar"),
msg: "listing jobs with namespace and selector failed",
},
// Matching labels to selectors and namespace, error case
{
inJobs: []*batch.Job{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "foo"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "bar"},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
},
},
},
list: func() ([]*batch.Job, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod",
Labels: map[string]string{"foo": "bar"},
Namespace: "baz",
},
}
podJobs, err := lister.GetPodJobs(pod)
jobs := make([]*batch.Job, 0, len(podJobs))
for i := range podJobs {
jobs = append(jobs, &podJobs[i])
}
return jobs, err
},
expectErr: true,
msg: "listing jobs with namespace and selector failed: expected error, got none",
},
}
for _, c := range testCases {
for _, r := range c.inJobs {
indexer.Add(r)
}
Jobs, err := c.list()
if err != nil && c.expectErr {
continue
} else if c.expectErr {
t.Errorf("%v", c.msg)
continue
} else if err != nil {
t.Errorf("Unexpected error %#v", err)
continue
}
JobNames := make([]string, len(Jobs))
for ix := range Jobs {
JobNames[ix] = Jobs[ix].Name
}
if !c.outJobNames.HasAll(JobNames...) || len(JobNames) != len(c.outJobNames) {
t.Errorf("%v : expected %v, got %v", c.msg, JobNames, c.outJobNames)
}
}
}

View File

@ -13,6 +13,7 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"batch.go",
"core.go", "core.go",
"extensions.go", "extensions.go",
"factory.go", "factory.go",
@ -24,11 +25,13 @@ go_library(
deps = [ deps = [
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions:go_default_library",
"//pkg/apis/rbac:go_default_library", "//pkg/apis/rbac:go_default_library",
"//pkg/apis/storage:go_default_library", "//pkg/apis/storage:go_default_library",
"//pkg/client/cache:go_default_library", "//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/listers/batch/internalversion:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/runtime:go_default_library", "//pkg/runtime:go_default_library",
"//pkg/watch:go_default_library", "//pkg/watch:go_default_library",

View File

@ -0,0 +1,83 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"reflect"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
batchinternallisters "k8s.io/kubernetes/pkg/client/listers/batch/internalversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// JobInformer is type of SharedIndexInformer which watches and lists all jobs.
// Interface provides constructor for informer and lister for jobs
type JobInformer interface {
Informer() cache.SharedIndexInformer
Lister() batchinternallisters.JobLister
}
type jobInformer struct {
*sharedInformerFactory
}
// Informer checks whether jobInformer exists in sharedInformerFactory and if not, it creates new informer of type
// jobInformer and connects it to sharedInformerFactory
func (f *jobInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&batch.Job{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = NewJobInformer(f.client, f.defaultResync)
f.informers[informerType] = informer
return informer
}
// NewJobInformer returns a SharedIndexInformer that lists and watches all jobs
func NewJobInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Batch().Jobs(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Batch().Jobs(api.NamespaceAll).Watch(options)
},
},
&batch.Job{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
// Lister returns lister for jobInformer
func (f *jobInformer) Lister() batchinternallisters.JobLister {
informer := f.Informer()
return batchinternallisters.NewJobLister(informer.GetIndexer())
}

View File

@ -54,6 +54,8 @@ type SharedInformerFactory interface {
Roles() RoleInformer Roles() RoleInformer
StorageClasses() StorageClassInformer StorageClasses() StorageClassInformer
Jobs() JobInformer
} }
type sharedInformerFactory struct { type sharedInformerFactory struct {
@ -158,3 +160,8 @@ func (f *sharedInformerFactory) LimitRanges() LimitRangeInformer {
func (f *sharedInformerFactory) StorageClasses() StorageClassInformer { func (f *sharedInformerFactory) StorageClasses() StorageClassInformer {
return &storageClassInformer{sharedInformerFactory: f} return &storageClassInformer{sharedInformerFactory: f}
} }
// Jobs returns a SharedIndexInformer that lists and watches all storage jobs
func (f *sharedInformerFactory) Jobs() JobInformer {
return &jobInformer{sharedInformerFactory: f}
}

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/rbac" "k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
@ -67,6 +68,9 @@ func (f *sharedInformerFactory) ForResource(resource unversioned.GroupResource)
return &genericInformer{resource: resource, informer: f.RoleBindings().Informer()}, nil return &genericInformer{resource: resource, informer: f.RoleBindings().Informer()}, nil
case rbac.Resource("roles"): case rbac.Resource("roles"):
return &genericInformer{resource: resource, informer: f.Roles().Informer()}, nil return &genericInformer{resource: resource, informer: f.Roles().Informer()}, nil
case batch.Resource("jobs"):
return &genericInformer{resource: resource, informer: f.Jobs().Informer()}, nil
} }
return nil, fmt.Errorf("no informer found for %v", resource) return nil, fmt.Errorf("no informer found for %v", resource)

View File

@ -20,21 +20,20 @@ go_library(
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/apis/batch:go_default_library", "//pkg/apis/batch:go_default_library",
"//pkg/client/cache:go_default_library", "//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/listers/batch/internalversion:go_default_library",
"//pkg/client/record:go_default_library", "//pkg/client/record:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library", "//pkg/controller/informers:go_default_library",
"//pkg/controller/replication:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/metrics:go_default_library", "//pkg/util/metrics:go_default_library",
"//pkg/util/runtime:go_default_library", "//pkg/util/runtime:go_default_library",
"//pkg/util/wait:go_default_library", "//pkg/util/wait:go_default_library",
"//pkg/util/workqueue:go_default_library", "//pkg/util/workqueue:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
], ],
) )
@ -52,11 +51,13 @@ go_test(
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/apimachinery/registered:go_default_library", "//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/batch:go_default_library", "//pkg/apis/batch:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/restclient:go_default_library", "//pkg/client/restclient:go_default_library",
"//pkg/client/testing/core:go_default_library", "//pkg/client/testing/core:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/util/rand:go_default_library", "//pkg/util/rand:go_default_library",
"//pkg/util/wait:go_default_library", "//pkg/util/wait:go_default_library",
"//pkg/watch:go_default_library", "//pkg/watch:go_default_library",

View File

@ -24,21 +24,20 @@ import (
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
batchinternallisters "k8s.io/kubernetes/pkg/client/listers/batch/internalversion"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/controller/informers"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -47,27 +46,21 @@ type JobController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podControl controller.PodControlInterface podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewJobController(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedInformer
// To allow injection of updateJobStatus for testing. // To allow injection of updateJobStatus for testing.
updateHandler func(job *batch.Job) error updateHandler func(job *batch.Job) error
syncHandler func(jobKey string) error syncHandler func(jobKey string) error
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool podStoreSynced cache.InformerSynced
// jobStoreSynced returns true if the job store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jobStoreSynced cache.InformerSynced
// A TTLCache of pod creates/deletes each rc expects to see // A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface expectations controller.ControllerExpectationsInterface
// A store of job, populated by the jobController // A store of jobs
jobStore cache.StoreToJobLister jobLister batchinternallisters.JobLister
// Watches changes to all jobs
jobController *cache.Controller
// A store of pods, populated by the podController // A store of pods, populated by the podController
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
@ -78,7 +71,7 @@ type JobController struct {
recorder record.EventRecorder recorder record.EventRecorder
} }
func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface) *JobController { func NewJobController(podInformer cache.SharedIndexInformer, jobInformer informers.JobInformer, kubeClient clientset.Interface) *JobController {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
@ -99,19 +92,7 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}), recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
} }
jm.jobStore.Store, jm.jobController = cache.NewInformer( jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options)
},
},
&batch.Job{},
// TODO: Can we have much longer period here?
replicationcontroller.FullControllerResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController, AddFunc: jm.enqueueController,
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
if job := cur.(*batch.Job); !IsJobFinished(job) { if job := cur.(*batch.Job); !IsJobFinished(job) {
@ -119,8 +100,9 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse
} }
}, },
DeleteFunc: jm.enqueueController, DeleteFunc: jm.enqueueController,
}, })
) jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod, AddFunc: jm.addPod,
@ -135,39 +117,26 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse
return jm return jm
} }
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
jm := NewJobController(podInformer, kubeClient)
jm.internalPodInformer = podInformer
return jm
}
// Run the main goroutine responsible for watching and syncing jobs. // Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer jm.queue.ShutDown() defer jm.queue.ShutDown()
if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced) { if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
return return
} }
go jm.jobController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh) go wait.Until(jm.worker, time.Second, stopCh)
} }
if jm.internalPodInformer != nil {
go jm.internalPodInformer.Run(stopCh)
}
<-stopCh <-stopCh
glog.Infof("Shutting down Job Manager") glog.Infof("Shutting down Job Manager")
} }
// getPodJob returns the job managing the given pod. // getPodJob returns the job managing the given pod.
func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job { func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job {
jobs, err := jm.jobStore.GetPodJobs(pod) jobs, err := jm.jobLister.GetPodJobs(pod)
if err != nil { if err != nil {
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
return nil return nil
@ -315,16 +284,23 @@ func (jm *JobController) syncJob(key string) error {
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
}() }()
obj, exists, err := jm.jobStore.Store.GetByKey(key) ns, name, err := cache.SplitMetaNamespaceKey(key)
if !exists { if err != nil {
return err
}
if len(ns) == 0 || len(name) == 0 {
return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
}
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
glog.V(4).Infof("Job has been deleted: %v", key) glog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key) jm.expectations.DeleteExpectations(key)
return nil return nil
} }
if err != nil {
return err return err
} }
job := *obj.(*batch.Job) job := *sharedJob
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters // and update the expectations after we've retrieved active pods from the store. If a new pod enters

View File

@ -18,7 +18,6 @@ package job
import ( import (
"fmt" "fmt"
"reflect"
"testing" "testing"
"time" "time"
@ -26,11 +25,13 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/util/rand" "k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -86,6 +87,13 @@ func getKey(job *batch.Job, t *testing.T) string {
} }
} }
func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) {
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
jm := NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), kubeClient)
return jm, sharedInformers
}
// create count pods with the given phase for the given job // create count pods with the given phase for the given job
func newPodList(count int32, status api.PodPhase, job *batch.Job) []api.Pod { func newPodList(count int32, status api.PodPhase, job *batch.Job) []api.Pod {
pods := []api.Pod{} pods := []api.Pod{}
@ -220,10 +228,11 @@ func TestControllerSyncJob(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError} fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateHandler = func(job *batch.Job) error { manager.updateHandler = func(job *batch.Job) error {
actual = job actual = job
@ -236,18 +245,19 @@ func TestControllerSyncJob(t *testing.T) {
now := unversioned.Now() now := unversioned.Now()
job.DeletionTimestamp = &now job.DeletionTimestamp = &now
} }
manager.jobStore.Store.Add(job) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer()
for _, pod := range newPodList(tc.pendingPods, api.PodPending, job) { for _, pod := range newPodList(tc.pendingPods, api.PodPending, job) {
manager.podStore.Indexer.Add(&pod) podIndexer.Add(&pod)
} }
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
manager.podStore.Indexer.Add(&pod) podIndexer.Add(&pod)
} }
for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) { for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
manager.podStore.Indexer.Add(&pod) podIndexer.Add(&pod)
} }
for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) { for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
manager.podStore.Indexer.Add(&pod) podIndexer.Add(&pod)
} }
// run // run
@ -322,10 +332,11 @@ func TestSyncJobPastDeadline(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateHandler = func(job *batch.Job) error { manager.updateHandler = func(job *batch.Job) error {
actual = job actual = job
@ -337,15 +348,16 @@ func TestSyncJobPastDeadline(t *testing.T) {
job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
start := unversioned.Unix(unversioned.Now().Time.Unix()-tc.startTime, 0) start := unversioned.Unix(unversioned.Now().Time.Unix()-tc.startTime, 0)
job.Status.StartTime = &start job.Status.StartTime = &start
manager.jobStore.Store.Add(job) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer()
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
manager.podStore.Indexer.Add(&pod) podIndexer.Add(&pod)
} }
for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) { for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
manager.podStore.Indexer.Add(&pod) podIndexer.Add(&pod)
} }
for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) { for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
manager.podStore.Indexer.Add(&pod) podIndexer.Add(&pod)
} }
// run // run
@ -392,10 +404,11 @@ func getCondition(job *batch.Job, condition batch.JobConditionType) bool {
func TestSyncPastDeadlineJobFinished(t *testing.T) { func TestSyncPastDeadlineJobFinished(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateHandler = func(job *batch.Job) error { manager.updateHandler = func(job *batch.Job) error {
actual = job actual = job
@ -408,7 +421,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
start := unversioned.Unix(unversioned.Now().Time.Unix()-15, 0) start := unversioned.Unix(unversioned.Now().Time.Unix()-15, 0)
job.Status.StartTime = &start job.Status.StartTime = &start
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline")) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
manager.jobStore.Store.Add(job) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t)) err := manager.syncJob(getKey(job, t))
if err != nil { if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err) t.Errorf("Unexpected error when syncing jobs %v", err)
@ -426,23 +439,23 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
func TestSyncJobComplete(t *testing.T) { func TestSyncJobComplete(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
job := newJob(1, 1) job := newJob(1, 1)
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
manager.jobStore.Store.Add(job) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t)) err := manager.syncJob(getKey(job, t))
if err != nil { if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err) t.Fatalf("Unexpected error when syncing jobs %v", err)
} }
uncastJob, _, err := manager.jobStore.Store.Get(job) actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
if err != nil { if err != nil {
t.Fatalf("Unexpected error when trying to get job from the store: %v", err) t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
} }
actual := uncastJob.(*batch.Job)
// Verify that after syncing a complete job, the conditions are the same. // Verify that after syncing a complete job, the conditions are the same.
if got, expected := len(actual.Status.Conditions), 1; got != expected { if got, expected := len(actual.Status.Conditions), 1; got != expected {
t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got) t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got)
@ -451,10 +464,11 @@ func TestSyncJobComplete(t *testing.T) {
func TestSyncJobDeleted(t *testing.T) { func TestSyncJobDeleted(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, _ := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return nil } manager.updateHandler = func(job *batch.Job) error { return nil }
job := newJob(2, 2) job := newJob(2, 2)
err := manager.syncJob(getKey(job, t)) err := manager.syncJob(getKey(job, t))
@ -471,17 +485,18 @@ func TestSyncJobDeleted(t *testing.T) {
func TestSyncJobUpdateRequeue(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
updateError := fmt.Errorf("Update error") updateError := fmt.Errorf("Update error")
manager.updateHandler = func(job *batch.Job) error { manager.updateHandler = func(job *batch.Job) error {
manager.queue.AddRateLimited(getKey(job, t)) manager.queue.AddRateLimited(getKey(job, t))
return updateError return updateError
} }
job := newJob(2, 2) job := newJob(2, 2)
manager.jobStore.Store.Add(job) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t)) err := manager.syncJob(getKey(job, t))
if err == nil || err != updateError { if err == nil || err != updateError {
t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err) t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
@ -496,8 +511,9 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
func TestJobPodLookup(t *testing.T) { func TestJobPodLookup(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
testCases := []struct { testCases := []struct {
job *batch.Job job *batch.Job
pod *api.Pod pod *api.Pod
@ -560,7 +576,7 @@ func TestJobPodLookup(t *testing.T) {
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
manager.jobStore.Add(tc.job) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(tc.job)
if job := manager.getPodJob(tc.pod); job != nil { if job := manager.getPodJob(tc.pod); job != nil {
if tc.expectedName != job.Name { if tc.expectedName != job.Name {
t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
@ -586,23 +602,25 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
// and checking expectations. // and checking expectations.
func TestSyncJobExpectations(t *testing.T) { func TestSyncJobExpectations(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return nil } manager.updateHandler = func(job *batch.Job) error { return nil }
job := newJob(2, 2) job := newJob(2, 2)
manager.jobStore.Store.Add(job) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
pods := newPodList(2, api.PodPending, job) pods := newPodList(2, api.PodPending, job)
manager.podStore.Indexer.Add(&pods[0]) podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer()
podIndexer.Add(&pods[0])
manager.expectations = FakeJobExpectations{ manager.expectations = FakeJobExpectations{
controller.NewControllerExpectations(), true, func() { controller.NewControllerExpectations(), true, func() {
// If we check active pods before checking expectataions, the job // If we check active pods before checking expectataions, the job
// will create a new replica because it doesn't see this pod, but // will create a new replica because it doesn't see this pod, but
// has fulfilled its expectations. // has fulfilled its expectations.
manager.podStore.Indexer.Add(&pods[1]) podIndexer.Add(&pods[1])
}, },
} }
manager.syncJob(getKey(job, t)) manager.syncJob(getKey(job, t))
@ -618,8 +636,9 @@ func TestWatchJobs(t *testing.T) {
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset()
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var testJob batch.Job var testJob batch.Job
received := make(chan struct{}) received := make(chan struct{})
@ -627,28 +646,30 @@ func TestWatchJobs(t *testing.T) {
// The update sent through the fakeWatcher should make its way into the workqueue, // The update sent through the fakeWatcher should make its way into the workqueue,
// and eventually into the syncHandler. // and eventually into the syncHandler.
manager.syncHandler = func(key string) error { manager.syncHandler = func(key string) error {
defer close(received)
obj, exists, err := manager.jobStore.Store.GetByKey(key) ns, name, err := cache.SplitMetaNamespaceKey(key)
if !exists || err != nil { if err != nil {
t.Errorf("Expected to find job under key %v", key) t.Errorf("Error getting namespace/name from key %v: %v", key, err)
} }
job, ok := obj.(*batch.Job) job, err := manager.jobLister.Jobs(ns).Get(name)
if !ok { if err != nil || job == nil {
t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) t.Errorf("Expected to find job under key %v: %v", key, err)
return nil
} }
if !api.Semantic.DeepDerivative(*job, testJob) { if !api.Semantic.DeepDerivative(*job, testJob) {
t.Errorf("Expected %#v, but got %#v", testJob, *job) t.Errorf("Expected %#v, but got %#v", testJob, *job)
} }
close(received)
return nil return nil
} }
// Start only the job watcher and the workqueue, send a watch event, // Start only the job watcher and the workqueue, send a watch event,
// and make sure it hits the sync method. // and make sure it hits the sync method.
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
sharedInformerFactory.Start(stopCh)
go manager.Run(1, stopCh) go manager.Run(1, stopCh)
// We're sending new job to see if it reaches syncHandler. // We're sending new job to see if it reaches syncHandler.
testJob.Namespace = "bar"
testJob.Name = "foo" testJob.Name = "foo"
fakeWatch.Add(&testJob) fakeWatch.Add(&testJob)
t.Log("Waiting for job to reach syncHandler") t.Log("Waiting for job to reach syncHandler")
@ -660,26 +681,23 @@ func TestWatchPods(t *testing.T) {
clientset := fake.NewSimpleClientset(testJob) clientset := fake.NewSimpleClientset(testJob)
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
// Put one job and one pod into the store // Put one job and one pod into the store
manager.jobStore.Store.Add(testJob) sharedInformerFactory.Jobs().Informer().GetIndexer().Add(testJob)
received := make(chan struct{}) received := make(chan struct{})
// The pod update sent through the fakeWatcher should figure out the managing job and // The pod update sent through the fakeWatcher should figure out the managing job and
// send it into the syncHandler. // send it into the syncHandler.
manager.syncHandler = func(key string) error { manager.syncHandler = func(key string) error {
obj, exists, err := manager.jobStore.Store.GetByKey(key) ns, name, err := cache.SplitMetaNamespaceKey(key)
if !exists || err != nil { if err != nil {
t.Errorf("Expected to find job under key %v", key) t.Errorf("Error getting namespace/name from key %v: %v", key, err)
close(received)
return nil
} }
job, ok := obj.(*batch.Job) job, err := manager.jobLister.Jobs(ns).Get(name)
if !ok { if err != nil {
t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) t.Errorf("Expected to find job under key %v: %v", key, err)
close(received)
return nil
} }
if !api.Semantic.DeepDerivative(job, testJob) { if !api.Semantic.DeepDerivative(job, testJob) {
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
@ -693,7 +711,7 @@ func TestWatchPods(t *testing.T) {
// and make sure it hits the sync method for the right job. // and make sure it hits the sync method for the right job.
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
go manager.internalPodInformer.Run(stopCh) go sharedInformerFactory.Pods().Informer().Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(1, api.PodRunning, testJob) pods := newPodList(1, api.PodRunning, testJob)

View File

@ -535,6 +535,7 @@ k8s.io/kubernetes/pkg/auth/handlers,liggitt,0
k8s.io/kubernetes/pkg/client/cache,xiang90,1 k8s.io/kubernetes/pkg/client/cache,xiang90,1
k8s.io/kubernetes/pkg/client/chaosclient,deads2k,1 k8s.io/kubernetes/pkg/client/chaosclient,deads2k,1
k8s.io/kubernetes/pkg/client/leaderelection,xiang90,1 k8s.io/kubernetes/pkg/client/leaderelection,xiang90,1
k8s.io/kubernetes/pkg/client/listers/batch/internalversion,mqliang,0
k8s.io/kubernetes/pkg/client/record,karlkfi,1 k8s.io/kubernetes/pkg/client/record,karlkfi,1
k8s.io/kubernetes/pkg/client/restclient,kargakis,1 k8s.io/kubernetes/pkg/client/restclient,kargakis,1
k8s.io/kubernetes/pkg/client/retry,caesarxuchao,1 k8s.io/kubernetes/pkg/client/retry,caesarxuchao,1

1 name owner auto-assigned
535 k8s.io/kubernetes/pkg/client/cache xiang90 1
536 k8s.io/kubernetes/pkg/client/chaosclient deads2k 1
537 k8s.io/kubernetes/pkg/client/leaderelection xiang90 1
538 k8s.io/kubernetes/pkg/client/listers/batch/internalversion mqliang 0
539 k8s.io/kubernetes/pkg/client/record karlkfi 1
540 k8s.io/kubernetes/pkg/client/restclient kargakis 1
541 k8s.io/kubernetes/pkg/client/retry caesarxuchao 1