diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 7ed75c68900..d759c75b613 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -144,18 +144,22 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { <-stopCh } -// getPodJob returns the job managing the given pod. -func (jm *JobController) getPodJob(pod *v1.Pod) *batch.Job { +// getPodJobs returns a list of Jobs that potentially match a Pod. +func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job { jobs, err := jm.jobLister.GetPodJobs(pod) if err != nil { - glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) return nil } if len(jobs) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(byCreationTimestamp(jobs)) } - return &jobs[0] + ret := make([]*batch.Job, 0, len(jobs)) + for i := range jobs { + ret = append(ret, &jobs[i]) + } + return ret } // When a pod is created, enqueue the controller that manages it and update it's expectations. @@ -167,14 +171,32 @@ func (jm *JobController) addPod(obj interface{}) { jm.deletePod(pod) return } - if job := jm.getPodJob(pod); job != nil { + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } jobKey, err := controller.KeyFunc(job) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) return } jm.expectations.CreationObserved(jobKey) jm.enqueueController(job) + return + } + + // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, job := range jm.getPodJobs(pod) { + jm.enqueueController(job) } } @@ -197,15 +219,40 @@ func (jm *JobController) updatePod(old, cur interface{}) { jm.deletePod(curPod) return } - if job := jm.getPodJob(curPod); job != nil { - jm.enqueueController(job) + + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) + + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + job, err := jm.jobLister.Jobs(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + jm.enqueueController(job) + } } - // Only need to get the old job if the labels changed. - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - // If the old and new job are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldJob := jm.getPodJob(oldPod); oldJob != nil { - jm.enqueueController(oldJob) + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + job, err := jm.jobLister.Jobs(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + jm.enqueueController(job) + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, job := range jm.getPodJobs(curPod) { + jm.enqueueController(job) } } } @@ -222,24 +269,36 @@ func (jm *JobController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj)) return } } - if job := jm.getPodJob(pod); job != nil { - jobKey, err := controller.KeyFunc(job) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) - return - } - jm.expectations.DeletionObserved(jobKey) - jm.enqueueController(job) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + jobKey, err := controller.KeyFunc(job) + if err != nil { + return + } + jm.expectations.DeletionObserved(jobKey) + jm.enqueueController(job) } // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 226f6bd4b85..594cc40451e 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "strconv" "testing" "time" @@ -598,7 +599,11 @@ func TestJobPodLookup(t *testing.T) { } for _, tc := range testCases { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job) - if job := manager.getPodJob(tc.pod); job != nil { + if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 { + if got, want := len(jobs), 1; got != want { + t.Errorf("len(jobs) = %v, want %v", got, want) + } + job := jobs[0] if tc.expectedName != job.Name { t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) } @@ -720,6 +725,279 @@ func TestGetPodsForJobRelease(t *testing.T) { } } +func TestAddPod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + jm.addPod(pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + jm.addPod(pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestAddPodOrphan(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + job3 := newJob(1, 1) + job3.Name = "job3" + job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) + + pod1 := newPod("pod1", job1) + // Make pod an orphan. Expect all matching controllers to be queued. + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + jm.addPod(pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + prev := *pod1 + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + prev = *pod2 + bumpResourceVersion(pod2) + jm.updatePod(&prev, pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Labels changed on orphan. Expect newly matching controllers to queue. + prev := *pod1 + prev.Labels = map[string]string{"foo2": "bar2"} + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodChangeControllerRef(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Changed ControllerRef. Expect both old and new to queue. + prev := *pod1 + prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(job2)} + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodRelease(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Remove ControllerRef. Expect all matching to queue for adoption. + prev := *pod1 + pod1.OwnerReferences = nil + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestDeletePod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + jm.deletePod(pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + jm.deletePod(pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestDeletePodOrphan(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + job3 := newJob(1, 1) + job3.Name = "job3" + job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) + + pod1 := newPod("pod1", job1) + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + jm.deletePod(pod1) + if got, want := jm.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + type FakeJobExpectations struct { *controller.ControllerExpectations satisfied bool @@ -855,3 +1133,8 @@ func TestWatchPods(t *testing.T) { t.Log("Waiting for pod to reach syncHandler") <-received } + +func bumpResourceVersion(obj metav1.Object) { + ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) + obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) +}