mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Merge pull request #36080 from ncdc/lister-gen
Automatic merge from submit-queue lister-gen updates - Remove "zz_generated." prefix from generated lister file names - Add support for expansion interfaces - Switch to new generated JobLister @deads2k @liggitt @sttts @mikedanese @caesarxuchao for the lister-gen changes @soltysh @deads2k for the informer / job controller changes
This commit is contained in:
@@ -13,6 +13,7 @@ load(
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"batch.go",
|
||||
"core.go",
|
||||
"extensions.go",
|
||||
"factory.go",
|
||||
@@ -24,11 +25,13 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/unversioned:go_default_library",
|
||||
"//pkg/apis/batch:go_default_library",
|
||||
"//pkg/apis/extensions:go_default_library",
|
||||
"//pkg/apis/rbac:go_default_library",
|
||||
"//pkg/apis/storage:go_default_library",
|
||||
"//pkg/client/cache:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//pkg/client/listers/batch/internalversion:go_default_library",
|
||||
"//pkg/client/listers/core/internalversion:go_default_library",
|
||||
"//pkg/runtime:go_default_library",
|
||||
"//pkg/watch:go_default_library",
|
||||
|
||||
83
pkg/controller/informers/batch.go
Normal file
83
pkg/controller/informers/batch.go
Normal file
@@ -0,0 +1,83 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package informers
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
batchinternallisters "k8s.io/kubernetes/pkg/client/listers/batch/internalversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// JobInformer is type of SharedIndexInformer which watches and lists all jobs.
|
||||
// Interface provides constructor for informer and lister for jobs
|
||||
type JobInformer interface {
|
||||
Informer() cache.SharedIndexInformer
|
||||
Lister() batchinternallisters.JobLister
|
||||
}
|
||||
|
||||
type jobInformer struct {
|
||||
*sharedInformerFactory
|
||||
}
|
||||
|
||||
// Informer checks whether jobInformer exists in sharedInformerFactory and if not, it creates new informer of type
|
||||
// jobInformer and connects it to sharedInformerFactory
|
||||
func (f *jobInformer) Informer() cache.SharedIndexInformer {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
informerType := reflect.TypeOf(&batch.Job{})
|
||||
informer, exists := f.informers[informerType]
|
||||
if exists {
|
||||
return informer
|
||||
}
|
||||
informer = NewJobInformer(f.client, f.defaultResync)
|
||||
f.informers[informerType] = informer
|
||||
|
||||
return informer
|
||||
}
|
||||
|
||||
// NewJobInformer returns a SharedIndexInformer that lists and watches all jobs
|
||||
func NewJobInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
|
||||
sharedIndexInformer := cache.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return client.Batch().Jobs(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return client.Batch().Jobs(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&batch.Job{},
|
||||
resyncPeriod,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
return sharedIndexInformer
|
||||
}
|
||||
|
||||
// Lister returns lister for jobInformer
|
||||
func (f *jobInformer) Lister() batchinternallisters.JobLister {
|
||||
informer := f.Informer()
|
||||
return batchinternallisters.NewJobLister(informer.GetIndexer())
|
||||
}
|
||||
@@ -54,6 +54,8 @@ type SharedInformerFactory interface {
|
||||
Roles() RoleInformer
|
||||
|
||||
StorageClasses() StorageClassInformer
|
||||
|
||||
Jobs() JobInformer
|
||||
}
|
||||
|
||||
type sharedInformerFactory struct {
|
||||
@@ -158,3 +160,8 @@ func (f *sharedInformerFactory) LimitRanges() LimitRangeInformer {
|
||||
func (f *sharedInformerFactory) StorageClasses() StorageClassInformer {
|
||||
return &storageClassInformer{sharedInformerFactory: f}
|
||||
}
|
||||
|
||||
// Jobs returns a SharedIndexInformer that lists and watches all storage jobs
|
||||
func (f *sharedInformerFactory) Jobs() JobInformer {
|
||||
return &jobInformer{sharedInformerFactory: f}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/apis/rbac"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
@@ -67,6 +68,9 @@ func (f *sharedInformerFactory) ForResource(resource unversioned.GroupResource)
|
||||
return &genericInformer{resource: resource, informer: f.RoleBindings().Informer()}, nil
|
||||
case rbac.Resource("roles"):
|
||||
return &genericInformer{resource: resource, informer: f.Roles().Informer()}, nil
|
||||
|
||||
case batch.Resource("jobs"):
|
||||
return &genericInformer{resource: resource, informer: f.Jobs().Informer()}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no informer found for %v", resource)
|
||||
|
||||
@@ -20,21 +20,20 @@ go_library(
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/errors:go_default_library",
|
||||
"//pkg/api/unversioned:go_default_library",
|
||||
"//pkg/apis/batch:go_default_library",
|
||||
"//pkg/client/cache:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
|
||||
"//pkg/client/listers/batch/internalversion:go_default_library",
|
||||
"//pkg/client/record:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/controller/replication:go_default_library",
|
||||
"//pkg/runtime:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//pkg/util/runtime:go_default_library",
|
||||
"//pkg/util/wait:go_default_library",
|
||||
"//pkg/util/workqueue:go_default_library",
|
||||
"//pkg/watch:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
],
|
||||
)
|
||||
@@ -52,11 +51,13 @@ go_test(
|
||||
"//pkg/api/unversioned:go_default_library",
|
||||
"//pkg/apimachinery/registered:go_default_library",
|
||||
"//pkg/apis/batch:go_default_library",
|
||||
"//pkg/client/cache:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
|
||||
"//pkg/client/restclient:go_default_library",
|
||||
"//pkg/client/testing/core:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/util/rand:go_default_library",
|
||||
"//pkg/util/wait:go_default_library",
|
||||
"//pkg/watch:go_default_library",
|
||||
|
||||
@@ -24,21 +24,20 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
||||
batchinternallisters "k8s.io/kubernetes/pkg/client/listers/batch/internalversion"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
@@ -47,27 +46,21 @@ type JobController struct {
|
||||
kubeClient clientset.Interface
|
||||
podControl controller.PodControlInterface
|
||||
|
||||
// internalPodInformer is used to hold a personal informer. If we're using
|
||||
// a normal shared informer, then the informer will be started for us. If
|
||||
// we have a personal informer, we must start it ourselves. If you start
|
||||
// the controller using NewJobController(passing SharedInformer), this
|
||||
// will be null
|
||||
internalPodInformer cache.SharedInformer
|
||||
|
||||
// To allow injection of updateJobStatus for testing.
|
||||
updateHandler func(job *batch.Job) error
|
||||
syncHandler func(jobKey string) error
|
||||
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
podStoreSynced func() bool
|
||||
podStoreSynced cache.InformerSynced
|
||||
// jobStoreSynced returns true if the job store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
jobStoreSynced cache.InformerSynced
|
||||
|
||||
// A TTLCache of pod creates/deletes each rc expects to see
|
||||
expectations controller.ControllerExpectationsInterface
|
||||
|
||||
// A store of job, populated by the jobController
|
||||
jobStore cache.StoreToJobLister
|
||||
// Watches changes to all jobs
|
||||
jobController *cache.Controller
|
||||
// A store of jobs
|
||||
jobLister batchinternallisters.JobLister
|
||||
|
||||
// A store of pods, populated by the podController
|
||||
podStore cache.StoreToPodLister
|
||||
@@ -78,7 +71,7 @@ type JobController struct {
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface) *JobController {
|
||||
func NewJobController(podInformer cache.SharedIndexInformer, jobInformer informers.JobInformer, kubeClient clientset.Interface) *JobController {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||
@@ -99,28 +92,17 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse
|
||||
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
|
||||
}
|
||||
|
||||
jm.jobStore.Store, jm.jobController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: jm.enqueueController,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if job := cur.(*batch.Job); !IsJobFinished(job) {
|
||||
jm.enqueueController(job)
|
||||
}
|
||||
},
|
||||
&batch.Job{},
|
||||
// TODO: Can we have much longer period here?
|
||||
replicationcontroller.FullControllerResyncPeriod,
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: jm.enqueueController,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if job := cur.(*batch.Job); !IsJobFinished(job) {
|
||||
jm.enqueueController(job)
|
||||
}
|
||||
},
|
||||
DeleteFunc: jm.enqueueController,
|
||||
},
|
||||
)
|
||||
DeleteFunc: jm.enqueueController,
|
||||
})
|
||||
jm.jobLister = jobInformer.Lister()
|
||||
jm.jobStoreSynced = jobInformer.Informer().HasSynced
|
||||
|
||||
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: jm.addPod,
|
||||
@@ -135,39 +117,26 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse
|
||||
return jm
|
||||
}
|
||||
|
||||
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
|
||||
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
|
||||
jm := NewJobController(podInformer, kubeClient)
|
||||
jm.internalPodInformer = podInformer
|
||||
|
||||
return jm
|
||||
}
|
||||
|
||||
// Run the main goroutine responsible for watching and syncing jobs.
|
||||
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer jm.queue.ShutDown()
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced) {
|
||||
if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
go jm.jobController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(jm.worker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
if jm.internalPodInformer != nil {
|
||||
go jm.internalPodInformer.Run(stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down Job Manager")
|
||||
}
|
||||
|
||||
// getPodJob returns the job managing the given pod.
|
||||
func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job {
|
||||
jobs, err := jm.jobStore.GetPodJobs(pod)
|
||||
jobs, err := jm.jobLister.GetPodJobs(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
|
||||
return nil
|
||||
@@ -315,16 +284,23 @@ func (jm *JobController) syncJob(key string) error {
|
||||
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
obj, exists, err := jm.jobStore.Store.GetByKey(key)
|
||||
if !exists {
|
||||
glog.V(4).Infof("Job has been deleted: %v", key)
|
||||
jm.expectations.DeleteExpectations(key)
|
||||
return nil
|
||||
}
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
job := *obj.(*batch.Job)
|
||||
if len(ns) == 0 || len(name) == 0 {
|
||||
return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
|
||||
}
|
||||
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
glog.V(4).Infof("Job has been deleted: %v", key)
|
||||
jm.expectations.DeleteExpectations(key)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
job := *sharedJob
|
||||
|
||||
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
||||
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||
|
||||
@@ -18,7 +18,6 @@ package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -26,11 +25,13 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
"k8s.io/kubernetes/pkg/util/rand"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
@@ -86,6 +87,13 @@ func getKey(job *batch.Job, t *testing.T) string {
|
||||
}
|
||||
}
|
||||
|
||||
func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) {
|
||||
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
|
||||
jm := NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), kubeClient)
|
||||
|
||||
return jm, sharedInformers
|
||||
}
|
||||
|
||||
// create count pods with the given phase for the given job
|
||||
func newPodList(count int32, status api.PodPhase, job *batch.Job) []api.Pod {
|
||||
pods := []api.Pod{}
|
||||
@@ -220,10 +228,11 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
for name, tc := range testCases {
|
||||
// job manager setup
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
var actual *batch.Job
|
||||
manager.updateHandler = func(job *batch.Job) error {
|
||||
actual = job
|
||||
@@ -236,18 +245,19 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
now := unversioned.Now()
|
||||
job.DeletionTimestamp = &now
|
||||
}
|
||||
manager.jobStore.Store.Add(job)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
|
||||
podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer()
|
||||
for _, pod := range newPodList(tc.pendingPods, api.PodPending, job) {
|
||||
manager.podStore.Indexer.Add(&pod)
|
||||
podIndexer.Add(&pod)
|
||||
}
|
||||
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
|
||||
manager.podStore.Indexer.Add(&pod)
|
||||
podIndexer.Add(&pod)
|
||||
}
|
||||
for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
|
||||
manager.podStore.Indexer.Add(&pod)
|
||||
podIndexer.Add(&pod)
|
||||
}
|
||||
for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
|
||||
manager.podStore.Indexer.Add(&pod)
|
||||
podIndexer.Add(&pod)
|
||||
}
|
||||
|
||||
// run
|
||||
@@ -322,10 +332,11 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
||||
for name, tc := range testCases {
|
||||
// job manager setup
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
var actual *batch.Job
|
||||
manager.updateHandler = func(job *batch.Job) error {
|
||||
actual = job
|
||||
@@ -337,15 +348,16 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
||||
job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
|
||||
start := unversioned.Unix(unversioned.Now().Time.Unix()-tc.startTime, 0)
|
||||
job.Status.StartTime = &start
|
||||
manager.jobStore.Store.Add(job)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
|
||||
podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer()
|
||||
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
|
||||
manager.podStore.Indexer.Add(&pod)
|
||||
podIndexer.Add(&pod)
|
||||
}
|
||||
for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
|
||||
manager.podStore.Indexer.Add(&pod)
|
||||
podIndexer.Add(&pod)
|
||||
}
|
||||
for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
|
||||
manager.podStore.Indexer.Add(&pod)
|
||||
podIndexer.Add(&pod)
|
||||
}
|
||||
|
||||
// run
|
||||
@@ -392,10 +404,11 @@ func getCondition(job *batch.Job, condition batch.JobConditionType) bool {
|
||||
|
||||
func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
var actual *batch.Job
|
||||
manager.updateHandler = func(job *batch.Job) error {
|
||||
actual = job
|
||||
@@ -408,7 +421,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||
start := unversioned.Unix(unversioned.Now().Time.Unix()-15, 0)
|
||||
job.Status.StartTime = &start
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
manager.jobStore.Store.Add(job)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||
@@ -426,23 +439,23 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||
|
||||
func TestSyncJobComplete(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
|
||||
job := newJob(1, 1)
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
|
||||
manager.jobStore.Store.Add(job)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error when syncing jobs %v", err)
|
||||
}
|
||||
uncastJob, _, err := manager.jobStore.Store.Get(job)
|
||||
actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
|
||||
}
|
||||
actual := uncastJob.(*batch.Job)
|
||||
// Verify that after syncing a complete job, the conditions are the same.
|
||||
if got, expected := len(actual.Status.Conditions), 1; got != expected {
|
||||
t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got)
|
||||
@@ -451,10 +464,11 @@ func TestSyncJobComplete(t *testing.T) {
|
||||
|
||||
func TestSyncJobDeleted(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, _ := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
manager.updateHandler = func(job *batch.Job) error { return nil }
|
||||
job := newJob(2, 2)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
@@ -471,17 +485,18 @@ func TestSyncJobDeleted(t *testing.T) {
|
||||
|
||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
updateError := fmt.Errorf("Update error")
|
||||
manager.updateHandler = func(job *batch.Job) error {
|
||||
manager.queue.AddRateLimited(getKey(job, t))
|
||||
return updateError
|
||||
}
|
||||
job := newJob(2, 2)
|
||||
manager.jobStore.Store.Add(job)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err == nil || err != updateError {
|
||||
t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
|
||||
@@ -496,8 +511,9 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
|
||||
func TestJobPodLookup(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
testCases := []struct {
|
||||
job *batch.Job
|
||||
pod *api.Pod
|
||||
@@ -560,7 +576,7 @@ func TestJobPodLookup(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
manager.jobStore.Add(tc.job)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(tc.job)
|
||||
if job := manager.getPodJob(tc.pod); job != nil {
|
||||
if tc.expectedName != job.Name {
|
||||
t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
|
||||
@@ -586,23 +602,25 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
|
||||
// and checking expectations.
|
||||
func TestSyncJobExpectations(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
manager.updateHandler = func(job *batch.Job) error { return nil }
|
||||
|
||||
job := newJob(2, 2)
|
||||
manager.jobStore.Store.Add(job)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(job)
|
||||
pods := newPodList(2, api.PodPending, job)
|
||||
manager.podStore.Indexer.Add(&pods[0])
|
||||
podIndexer := sharedInformerFactory.Pods().Informer().GetIndexer()
|
||||
podIndexer.Add(&pods[0])
|
||||
|
||||
manager.expectations = FakeJobExpectations{
|
||||
controller.NewControllerExpectations(), true, func() {
|
||||
// If we check active pods before checking expectataions, the job
|
||||
// will create a new replica because it doesn't see this pod, but
|
||||
// has fulfilled its expectations.
|
||||
manager.podStore.Indexer.Add(&pods[1])
|
||||
podIndexer.Add(&pods[1])
|
||||
},
|
||||
}
|
||||
manager.syncJob(getKey(job, t))
|
||||
@@ -618,8 +636,9 @@ func TestWatchJobs(t *testing.T) {
|
||||
clientset := fake.NewSimpleClientset()
|
||||
fakeWatch := watch.NewFake()
|
||||
clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
|
||||
var testJob batch.Job
|
||||
received := make(chan struct{})
|
||||
@@ -627,28 +646,30 @@ func TestWatchJobs(t *testing.T) {
|
||||
// The update sent through the fakeWatcher should make its way into the workqueue,
|
||||
// and eventually into the syncHandler.
|
||||
manager.syncHandler = func(key string) error {
|
||||
|
||||
obj, exists, err := manager.jobStore.Store.GetByKey(key)
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Expected to find job under key %v", key)
|
||||
defer close(received)
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
t.Errorf("Error getting namespace/name from key %v: %v", key, err)
|
||||
}
|
||||
job, ok := obj.(*batch.Job)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
|
||||
job, err := manager.jobLister.Jobs(ns).Get(name)
|
||||
if err != nil || job == nil {
|
||||
t.Errorf("Expected to find job under key %v: %v", key, err)
|
||||
return nil
|
||||
}
|
||||
if !api.Semantic.DeepDerivative(*job, testJob) {
|
||||
t.Errorf("Expected %#v, but got %#v", testJob, *job)
|
||||
}
|
||||
close(received)
|
||||
return nil
|
||||
}
|
||||
// Start only the job watcher and the workqueue, send a watch event,
|
||||
// and make sure it hits the sync method.
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
sharedInformerFactory.Start(stopCh)
|
||||
go manager.Run(1, stopCh)
|
||||
|
||||
// We're sending new job to see if it reaches syncHandler.
|
||||
testJob.Namespace = "bar"
|
||||
testJob.Name = "foo"
|
||||
fakeWatch.Add(&testJob)
|
||||
t.Log("Waiting for job to reach syncHandler")
|
||||
@@ -660,26 +681,23 @@ func TestWatchPods(t *testing.T) {
|
||||
clientset := fake.NewSimpleClientset(testJob)
|
||||
fakeWatch := watch.NewFake()
|
||||
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
|
||||
// Put one job and one pod into the store
|
||||
manager.jobStore.Store.Add(testJob)
|
||||
sharedInformerFactory.Jobs().Informer().GetIndexer().Add(testJob)
|
||||
received := make(chan struct{})
|
||||
// The pod update sent through the fakeWatcher should figure out the managing job and
|
||||
// send it into the syncHandler.
|
||||
manager.syncHandler = func(key string) error {
|
||||
obj, exists, err := manager.jobStore.Store.GetByKey(key)
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Expected to find job under key %v", key)
|
||||
close(received)
|
||||
return nil
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
t.Errorf("Error getting namespace/name from key %v: %v", key, err)
|
||||
}
|
||||
job, ok := obj.(*batch.Job)
|
||||
if !ok {
|
||||
t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
|
||||
close(received)
|
||||
return nil
|
||||
job, err := manager.jobLister.Jobs(ns).Get(name)
|
||||
if err != nil {
|
||||
t.Errorf("Expected to find job under key %v: %v", key, err)
|
||||
}
|
||||
if !api.Semantic.DeepDerivative(job, testJob) {
|
||||
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
|
||||
@@ -693,7 +711,7 @@ func TestWatchPods(t *testing.T) {
|
||||
// and make sure it hits the sync method for the right job.
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go manager.internalPodInformer.Run(stopCh)
|
||||
go sharedInformerFactory.Pods().Informer().Run(stopCh)
|
||||
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
|
||||
|
||||
pods := newPodList(1, api.PodRunning, testJob)
|
||||
|
||||
Reference in New Issue
Block a user