Job: Use ControllerRef to route watch events.

This is part of the completion of ControllerRef, as described here:

https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches
This commit is contained in:
Anthony Yeh 2017-02-26 18:25:21 -08:00
parent 424de52779
commit 067a8ff3c2
2 changed files with 368 additions and 26 deletions

View File

@ -144,18 +144,22 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
<-stopCh
}
// getPodJob returns the job managing the given pod.
func (jm *JobController) getPodJob(pod *v1.Pod) *batch.Job {
// getPodJobs returns a list of Jobs that potentially match a Pod.
func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job {
jobs, err := jm.jobLister.GetPodJobs(pod)
if err != nil {
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
return nil
}
if len(jobs) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
sort.Sort(byCreationTimestamp(jobs))
}
return &jobs[0]
ret := make([]*batch.Job, 0, len(jobs))
for i := range jobs {
ret = append(ret, &jobs[i])
}
return ret
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
@ -167,14 +171,32 @@ func (jm *JobController) addPod(obj interface{}) {
jm.deletePod(pod)
return
}
if job := jm.getPodJob(pod); job != nil {
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
return
}
// Otherwise, it's an orphan. Get a list of all matching controllers and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job)
}
}
@ -197,15 +219,40 @@ func (jm *JobController) updatePod(old, cur interface{}) {
jm.deletePod(curPod)
return
}
if job := jm.getPodJob(curPod); job != nil {
jm.enqueueController(job)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged &&
oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind {
// The ControllerRef was changed. Sync the old controller, if any.
job, err := jm.jobLister.Jobs(oldPod.Namespace).Get(oldControllerRef.Name)
if err == nil {
jm.enqueueController(job)
}
}
// Only need to get the old job if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
// If the old and new job are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldJob := jm.getPodJob(oldPod); oldJob != nil {
jm.enqueueController(oldJob)
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
if curControllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
job, err := jm.jobLister.Jobs(curPod.Namespace).Get(curControllerRef.Name)
if err != nil {
return
}
jm.enqueueController(job)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job)
}
}
}
@ -222,24 +269,36 @@ func (jm *JobController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj))
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj))
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
return
}
}
if job := jm.getPodJob(pod); job != nil {
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
jobKey, err := controller.KeyFunc(job)
if err != nil {
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
}
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.

View File

@ -18,6 +18,7 @@ package job
import (
"fmt"
"strconv"
"testing"
"time"
@ -598,7 +599,11 @@ func TestJobPodLookup(t *testing.T) {
}
for _, tc := range testCases {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job)
if job := manager.getPodJob(tc.pod); job != nil {
if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 {
if got, want := len(jobs), 1; got != want {
t.Errorf("len(jobs) = %v, want %v", got, want)
}
job := jobs[0]
if tc.expectedName != job.Name {
t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
}
@ -720,6 +725,279 @@ func TestGetPodsForJobRelease(t *testing.T) {
}
}
func TestAddPod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job2)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
jm.addPod(pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(job1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
jm.addPod(pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(job2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestAddPodOrphan(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
job3 := newJob(1, 1)
job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
pod1 := newPod("pod1", job1)
// Make pod an orphan. Expect all matching controllers to be queued.
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
jm.addPod(pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job2)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
prev := *pod1
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(job1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
prev = *pod2
bumpResourceVersion(pod2)
jm.updatePod(&prev, pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(job2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
// Labels changed on orphan. Expect newly matching controllers to queue.
prev := *pod1
prev.Labels = map[string]string{"foo2": "bar2"}
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePodChangeControllerRef(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
// Changed ControllerRef. Expect both old and new to queue.
prev := *pod1
prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(job2)}
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePodRelease(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
// Remove ControllerRef. Expect all matching to queue for adoption.
prev := *pod1
pod1.OwnerReferences = nil
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestDeletePod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job2)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
jm.deletePod(pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(job1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
jm.deletePod(pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(job2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestDeletePodOrphan(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
job3 := newJob(1, 1)
job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
pod1 := newPod("pod1", job1)
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
jm.deletePod(pod1)
if got, want := jm.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
type FakeJobExpectations struct {
*controller.ControllerExpectations
satisfied bool
@ -855,3 +1133,8 @@ func TestWatchPods(t *testing.T) {
t.Log("Waiting for pod to reach syncHandler")
<-received
}
func bumpResourceVersion(obj metav1.Object) {
ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
}