Merge pull request #78944 from avorima/golint_fix_job

Fix golint errors in pkg/controller/job
This commit is contained in:
Kubernetes Prow Robot 2020-04-12 21:57:47 -07:00 committed by GitHub
commit b17ddac4df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 55 additions and 50 deletions

View File

@ -34,7 +34,7 @@ func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] {
return nil, false, nil return nil, false, nil
} }
go job.NewJobController( go job.NewController(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(), ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"), ctx.ClientBuilder.ClientOrDie("job-controller"),

View File

@ -64,7 +64,6 @@ pkg/controller/endpoint/config/v1alpha1
pkg/controller/endpointslice/config/v1alpha1 pkg/controller/endpointslice/config/v1alpha1
pkg/controller/garbagecollector pkg/controller/garbagecollector
pkg/controller/garbagecollector/config/v1alpha1 pkg/controller/garbagecollector/config/v1alpha1
pkg/controller/job
pkg/controller/job/config/v1alpha1 pkg/controller/job/config/v1alpha1
pkg/controller/namespace pkg/controller/namespace
pkg/controller/namespace/config/v1alpha1 pkg/controller/namespace/config/v1alpha1

View File

@ -61,7 +61,9 @@ var (
MaxJobBackOff = 360 * time.Second MaxJobBackOff = 360 * time.Second
) )
type JobController struct { // Controller ensures that all Job objects have corresponding pods to
// run their configured workload.
type Controller struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podControl controller.PodControlInterface podControl controller.PodControlInterface
@ -90,7 +92,9 @@ type JobController struct {
recorder record.EventRecorder recorder record.EventRecorder
} }
func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController { // NewController creates a new Job controller that keeps the relevant pods
// in sync with their corresponding Job objects.
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
@ -99,7 +103,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
} }
jm := &JobController{ jm := &Controller{
kubeClient: kubeClient, kubeClient: kubeClient,
podControl: controller.RealPodControl{ podControl: controller.RealPodControl{
KubeClient: kubeClient, KubeClient: kubeClient,
@ -137,7 +141,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
} }
// Run the main goroutine responsible for watching and syncing jobs. // Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer jm.queue.ShutDown() defer jm.queue.ShutDown()
@ -156,7 +160,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
} }
// getPodJobs returns a list of Jobs that potentially match a Pod. // getPodJobs returns a list of Jobs that potentially match a Pod.
func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job { func (jm *Controller) getPodJobs(pod *v1.Pod) []*batch.Job {
jobs, err := jm.jobLister.GetPodJobs(pod) jobs, err := jm.jobLister.GetPodJobs(pod)
if err != nil { if err != nil {
return nil return nil
@ -176,7 +180,7 @@ func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job {
// resolveControllerRef returns the controller referenced by a ControllerRef, // resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller // or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind. // of the correct Kind.
func (jm *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job { func (jm *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job {
// We can't look up by UID, so look up by Name and then verify UID. // We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind. // Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind { if controllerRef.Kind != controllerKind.Kind {
@ -195,7 +199,7 @@ func (jm *JobController) resolveControllerRef(namespace string, controllerRef *m
} }
// When a pod is created, enqueue the controller that manages it and update it's expectations. // When a pod is created, enqueue the controller that manages it and update it's expectations.
func (jm *JobController) addPod(obj interface{}) { func (jm *Controller) addPod(obj interface{}) {
pod := obj.(*v1.Pod) pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil { if pod.DeletionTimestamp != nil {
// on a restart of the controller controller, it's possible a new pod shows up in a state that // on a restart of the controller controller, it's possible a new pod shows up in a state that
@ -231,7 +235,7 @@ func (jm *JobController) addPod(obj interface{}) {
// When a pod is updated, figure out what job/s manage it and wake them up. // When a pod is updated, figure out what job/s manage it and wake them up.
// If the labels of the pod have changed we need to awaken both the old // If the labels of the pod have changed we need to awaken both the old
// and new job. old and cur must be *v1.Pod types. // and new job. old and cur must be *v1.Pod types.
func (jm *JobController) updatePod(old, cur interface{}) { func (jm *Controller) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod) curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod) oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion { if curPod.ResourceVersion == oldPod.ResourceVersion {
@ -283,7 +287,7 @@ func (jm *JobController) updatePod(old, cur interface{}) {
// When a pod is deleted, enqueue the job that manages the pod and update its expectations. // When a pod is deleted, enqueue the job that manages the pod and update its expectations.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (jm *JobController) deletePod(obj interface{}) { func (jm *Controller) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not // When a delete is dropped, the relist will notice a pod in the store not
@ -320,7 +324,7 @@ func (jm *JobController) deletePod(obj interface{}) {
jm.enqueueController(job, true) jm.enqueueController(job, true)
} }
func (jm *JobController) updateJob(old, cur interface{}) { func (jm *Controller) updateJob(old, cur interface{}) {
oldJob := old.(*batch.Job) oldJob := old.(*batch.Job)
curJob := cur.(*batch.Job) curJob := cur.(*batch.Job)
@ -352,7 +356,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item, // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
// immediate tells the controller to update the status right away, and should // immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run. // happen ONLY when there was a successful pod run.
func (jm *JobController) enqueueController(obj interface{}, immediate bool) { func (jm *Controller) enqueueController(obj interface{}, immediate bool) {
key, err := controller.KeyFunc(obj) key, err := controller.KeyFunc(obj)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
@ -375,12 +379,12 @@ func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() { func (jm *Controller) worker() {
for jm.processNextWorkItem() { for jm.processNextWorkItem() {
} }
} }
func (jm *JobController) processNextWorkItem() bool { func (jm *Controller) processNextWorkItem() bool {
key, quit := jm.queue.Get() key, quit := jm.queue.Get()
if quit { if quit {
return false return false
@ -404,7 +408,7 @@ func (jm *JobController) processNextWorkItem() bool {
// getPodsForJob returns the set of pods that this Job should manage. // getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning. // It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned Pods are pointers into the cache. // Note that the returned Pods are pointers into the cache.
func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { func (jm *Controller) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err) return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
@ -434,7 +438,7 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key. // concurrently with the same key.
func (jm *JobController) syncJob(key string) (bool, error) { func (jm *Controller) syncJob(key string) (bool, error) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
@ -601,7 +605,7 @@ func (jm *JobController) syncJob(key string) (bool, error) {
return forget, manageJobErr return forget, manageJobErr
} }
func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) { func (jm *Controller) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {
// TODO: below code should be replaced with pod termination resulting in // TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution // pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in // exists ATM. There's an open discussion in the topic in
@ -683,7 +687,7 @@ func getStatus(pods []*v1.Pod) (succeeded, failed int32) {
// manageJob is the core method responsible for managing the number of running // manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec. // pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>. // Does NOT modify <activePods>.
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
var activeLock sync.Mutex var activeLock sync.Mutex
active := int32(len(activePods)) active := int32(len(activePods))
parallelism := *job.Spec.Parallelism parallelism := *job.Spec.Parallelism
@ -822,7 +826,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
return active, nil return active, nil
} }
func (jm *JobController) updateJobStatus(job *batch.Job) error { func (jm *Controller) updateJobStatus(job *batch.Job) error {
jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace) jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace)
var err error var err error
for i := 0; i <= statusUpdateRetries; i = i + 1 { for i := 0; i <= statusUpdateRetries; i = i + 1 {

View File

@ -88,9 +88,9 @@ func newJob(parallelism, completions, backoffLimit int32) *batch.Job {
return j return j
} }
func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) { func newControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) {
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient) jm := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient)
jm.podControl = &controller.FakePodControl{} jm.podControl = &controller.FakePodControl{}
return jm, sharedInformers return jm, sharedInformers
@ -272,7 +272,7 @@ func TestControllerSyncJob(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -413,7 +413,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -479,7 +479,7 @@ func getCondition(job *batch.Job, condition batch.JobConditionType, reason strin
func TestSyncPastDeadlineJobFinished(t *testing.T) { func TestSyncPastDeadlineJobFinished(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -517,7 +517,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
func TestSyncJobComplete(t *testing.T) { func TestSyncJobComplete(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -545,7 +545,7 @@ func TestSyncJobComplete(t *testing.T) {
func TestSyncJobDeleted(t *testing.T) { func TestSyncJobDeleted(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, _ := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -570,7 +570,7 @@ func TestSyncJobDeleted(t *testing.T) {
func TestSyncJobUpdateRequeue(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -599,7 +599,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
func TestJobPodLookup(t *testing.T) { func TestJobPodLookup(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
testCases := []struct { testCases := []struct {
@ -681,7 +681,7 @@ func TestJobPodLookup(t *testing.T) {
func TestGetPodsForJob(t *testing.T) { func TestGetPodsForJob(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -729,7 +729,7 @@ func TestGetPodsForJobAdopt(t *testing.T) {
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6)
job1.Name = "job1" job1.Name = "job1"
clientset := fake.NewSimpleClientset(job1) clientset := fake.NewSimpleClientset(job1)
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -756,7 +756,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
job1.Name = "job1" job1.Name = "job1"
job1.DeletionTimestamp = &metav1.Time{} job1.DeletionTimestamp = &metav1.Time{}
clientset := fake.NewSimpleClientset(job1) clientset := fake.NewSimpleClientset(job1)
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -787,7 +787,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
// The up-to-date object says it's being deleted. // The up-to-date object says it's being deleted.
job1.DeletionTimestamp = &metav1.Time{} job1.DeletionTimestamp = &metav1.Time{}
clientset := fake.NewSimpleClientset(job1) clientset := fake.NewSimpleClientset(job1)
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -817,7 +817,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
func TestGetPodsForJobRelease(t *testing.T) { func TestGetPodsForJobRelease(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -846,7 +846,7 @@ func TestGetPodsForJobRelease(t *testing.T) {
func TestAddPod(t *testing.T) { func TestAddPod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -891,7 +891,7 @@ func TestAddPod(t *testing.T) {
func TestAddPodOrphan(t *testing.T) { func TestAddPodOrphan(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -919,7 +919,7 @@ func TestAddPodOrphan(t *testing.T) {
func TestUpdatePod(t *testing.T) { func TestUpdatePod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -968,7 +968,7 @@ func TestUpdatePod(t *testing.T) {
func TestUpdatePodOrphanWithNewLabels(t *testing.T) { func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -995,7 +995,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
func TestUpdatePodChangeControllerRef(t *testing.T) { func TestUpdatePodChangeControllerRef(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -1021,7 +1021,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
func TestUpdatePodRelease(t *testing.T) { func TestUpdatePodRelease(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -1047,7 +1047,7 @@ func TestUpdatePodRelease(t *testing.T) {
func TestDeletePod(t *testing.T) { func TestDeletePod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -1092,7 +1092,7 @@ func TestDeletePod(t *testing.T) {
func TestDeletePodOrphan(t *testing.T) { func TestDeletePodOrphan(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
@ -1132,7 +1132,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
// and checking expectations. // and checking expectations.
func TestSyncJobExpectations(t *testing.T) { func TestSyncJobExpectations(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -1166,7 +1166,7 @@ func TestWatchJobs(t *testing.T) {
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset()
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
@ -1211,7 +1211,7 @@ func TestWatchPods(t *testing.T) {
clientset := fake.NewSimpleClientset(testJob) clientset := fake.NewSimpleClientset(testJob)
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
@ -1294,7 +1294,7 @@ func TestJobBackoffReset(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -1382,7 +1382,7 @@ func TestJobBackoff(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -1474,7 +1474,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady

View File

@ -21,6 +21,8 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
) )
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func IsJobFinished(j *batch.Job) bool { func IsJobFinished(j *batch.Job) bool {
for _, c := range j.Status.Conditions { for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue { if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {

View File

@ -37,7 +37,7 @@ import (
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *cronjob.Controller, *job.JobController, informers.SharedInformerFactory, clientset.Interface, restclient.Config) { func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *cronjob.Controller, *job.Controller, informers.SharedInformerFactory, clientset.Interface, restclient.Config) {
masterConfig := framework.NewIntegrationTestMasterConfig() masterConfig := framework.NewIntegrationTestMasterConfig()
_, server, closeFn := framework.RunAMaster(masterConfig) _, server, closeFn := framework.RunAMaster(masterConfig)
@ -52,7 +52,7 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *cronjob.Contro
if err != nil { if err != nil {
t.Fatalf("Error creating CronJob controller: %v", err) t.Fatalf("Error creating CronJob controller: %v", err)
} }
jc := job.NewJobController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) jc := job.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
return server, closeFn, cjc, jc, informerSet, clientSet, config return server, closeFn, cjc, jc, informerSet, clientSet, config
} }