From bdfe18f6385032666f2eba4a30a15181d7241370 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sun, 26 Feb 2017 17:00:33 -0800 Subject: [PATCH 01/12] Job: Add ControllerRef on all created Pods. --- pkg/controller/job/jobcontroller.go | 5 ++++- pkg/controller/job/jobcontroller_test.go | 23 ++++++++++++++++++++--- pkg/controller/job/utils.go | 12 ++++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index ade0e584088..9c55e45cbb6 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -46,6 +46,9 @@ import ( "github.com/golang/glog" ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = batch.SchemeGroupVersion.WithKind("Job") + type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface @@ -507,7 +510,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b for i := int32(0); i < diff; i++ { go func() { defer wait.Done() - if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil { + if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod jm.expectations.CreationObserved(jobKey) diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 4d3cb6116ce..e4f1ed358e0 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -101,9 +101,10 @@ func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod { for i := int32(0); i < count; i++ { newPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%v", rand.String(10)), - Labels: job.Spec.Selector.MatchLabels, - Namespace: job.Namespace, + Name: fmt.Sprintf("pod-%v", rand.String(10)), + Labels: job.Spec.Selector.MatchLabels, + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)}, }, Status: v1.PodStatus{Phase: status}, } @@ -274,6 +275,22 @@ func TestControllerSyncJob(t *testing.T) { if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName)) } + // Each create should have an accompanying ControllerRef. + if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) { + t.Errorf("%s: unexpected number of ControllerRefs. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.ControllerRefs)) + } + // Make sure the ControllerRefs are correct. + for _, controllerRef := range fakePodControl.ControllerRefs { + if got, want := controllerRef.APIVersion, "batch/v1"; got != want { + t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) + } + if got, want := controllerRef.Kind, "Job"; got != want { + t.Errorf("controllerRef.Kind = %q, want %q", got, want) + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("controllerRef.Controller is not set to true") + } + } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active) diff --git a/pkg/controller/job/utils.go b/pkg/controller/job/utils.go index 5d14f054c60..47f1cc83d0e 100644 --- a/pkg/controller/job/utils.go +++ b/pkg/controller/job/utils.go @@ -17,6 +17,7 @@ limitations under the License. package job import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api/v1" batch "k8s.io/kubernetes/pkg/apis/batch/v1" ) @@ -29,3 +30,14 @@ func IsJobFinished(j *batch.Job) bool { } return false } + +func newControllerRef(j *batch.Job) *metav1.OwnerReference { + isController := true + return &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: j.Name, + UID: j.UID, + Controller: &isController, + } +} From bc423ac39ded41b1e044c77bdceabdd3a18b9271 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sun, 26 Feb 2017 17:04:28 -0800 Subject: [PATCH 02/12] Job: Set DefaultGarbageCollectionPolicy to OrphanDependents. Now that Job adds ControllerRef to Pods it creates, we need to set this default so legacy behavior is maintained. --- pkg/registry/batch/job/BUILD | 2 ++ pkg/registry/batch/job/strategy.go | 7 +++++++ pkg/registry/batch/job/strategy_test.go | 8 ++++++++ 3 files changed, 17 insertions(+) diff --git a/pkg/registry/batch/job/BUILD b/pkg/registry/batch/job/BUILD index 623a34d697f..c4a356516e1 100644 --- a/pkg/registry/batch/job/BUILD +++ b/pkg/registry/batch/job/BUILD @@ -26,6 +26,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library", ], @@ -44,6 +45,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", ], ) diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index 4a184a9f69a..4bfdb3213f8 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/api" @@ -43,6 +44,12 @@ type jobStrategy struct { // Strategy is the default logic that applies when creating and updating Replication Controller objects. var Strategy = jobStrategy{api.Scheme, names.SimpleNameGenerator} +// DefaultGarbageCollectionPolicy returns Orphan because that was the default +// behavior before the server-side garbage collection was implemented. +func (jobStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy { + return rest.OrphanDependents +} + // NamespaceScoped returns true because all jobs need to be within a namespace. func (jobStrategy) NamespaceScoped() bool { return true diff --git a/pkg/registry/batch/job/strategy_test.go b/pkg/registry/batch/job/strategy_test.go index 590afa0e98f..1586c4994d1 100644 --- a/pkg/registry/batch/job/strategy_test.go +++ b/pkg/registry/batch/job/strategy_test.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" @@ -100,6 +101,13 @@ func TestJobStrategy(t *testing.T) { if len(errs) == 0 { t.Errorf("Expected a validation error") } + + // Make sure we correctly implement the interface. + // Otherwise a typo could silently change the default. + var gcds rest.GarbageCollectionDeleteStrategy = Strategy + if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want { + t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want) + } } func TestJobStrategyWithGeneration(t *testing.T) { From 424de527790abb46790958ce3ea4d232aba8993b Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sun, 26 Feb 2017 17:49:00 -0800 Subject: [PATCH 03/12] Job: Use ControllerRefManager to adopt/orphan. --- pkg/controller/job/BUILD | 2 + pkg/controller/job/jobcontroller.go | 23 ++++- pkg/controller/job/jobcontroller_test.go | 115 +++++++++++++++++++++++ 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/pkg/controller/job/BUILD b/pkg/controller/job/BUILD index 4d535551f3e..e37de5221ba 100644 --- a/pkg/controller/job/BUILD +++ b/pkg/controller/job/BUILD @@ -30,6 +30,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", @@ -59,6 +60,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 9c55e45cbb6..7ed75c68900 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -284,6 +285,24 @@ func (jm *JobController) processNextWorkItem() bool { return true } +// getPodsForJob returns the set of pods that this Job should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// Note that the returned Pods are pointers into the cache. +func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { + selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("couldn't convert Job selector: %v", err) + } + // List all pods to include those that don't match the selector anymore + // but have a ControllerRef pointing to this controller. + pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind) + return cm.ClaimPods(pods) +} + // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. @@ -315,8 +334,8 @@ func (jm *JobController) syncJob(key string) error { // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. jobNeedsSync := jm.expectations.SatisfiedExpectations(key) - selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector) - pods, err := jm.podStore.Pods(job.Namespace).List(selector) + + pods, err := jm.getPodsForJob(&job) if err != nil { return err } diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index e4f1ed358e0..226f6bd4b85 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -24,6 +24,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" @@ -44,6 +45,7 @@ func newJob(parallelism, completions int32) *batch.Job { j := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", + UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, }, Spec: batch.JobSpec{ @@ -91,6 +93,7 @@ func getKey(job *batch.Job, t *testing.T) string { func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) { sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient) + jm.podControl = &controller.FakePodControl{} return jm, sharedInformers } @@ -605,6 +608,118 @@ func TestJobPodLookup(t *testing.T) { } } +func newPod(name string, job *batch.Job) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: job.Spec.Selector.MatchLabels, + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)}, + }, + } +} + +func TestGetPodsForJob(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + pod3 := newPod("pod3", job1) + // Make pod3 an orphan that doesn't match. It should be ignored. + pod3.OwnerReferences = nil + pod3.Labels = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod3) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod1"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } + + pods, err = jm.getPodsForJob(job2) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod2"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } +} + +func TestGetPodsForJobAdopt(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should still be returned because it's adopted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 2; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } +} + +func TestGetPodsForJobRelease(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod not match, even though it's owned. It should be released. + pod2.Labels = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod1"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } +} + type FakeJobExpectations struct { *controller.ControllerExpectations satisfied bool From 067a8ff3c2271b61289516b177439f45fa7651a8 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sun, 26 Feb 2017 18:25:21 -0800 Subject: [PATCH 04/12] Job: Use ControllerRef to route watch events. This is part of the completion of ControllerRef, as described here: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches --- pkg/controller/job/jobcontroller.go | 109 +++++++-- pkg/controller/job/jobcontroller_test.go | 285 ++++++++++++++++++++++- 2 files changed, 368 insertions(+), 26 deletions(-) diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 7ed75c68900..d759c75b613 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -144,18 +144,22 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { <-stopCh } -// getPodJob returns the job managing the given pod. -func (jm *JobController) getPodJob(pod *v1.Pod) *batch.Job { +// getPodJobs returns a list of Jobs that potentially match a Pod. +func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job { jobs, err := jm.jobLister.GetPodJobs(pod) if err != nil { - glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) return nil } if len(jobs) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(byCreationTimestamp(jobs)) } - return &jobs[0] + ret := make([]*batch.Job, 0, len(jobs)) + for i := range jobs { + ret = append(ret, &jobs[i]) + } + return ret } // When a pod is created, enqueue the controller that manages it and update it's expectations. @@ -167,14 +171,32 @@ func (jm *JobController) addPod(obj interface{}) { jm.deletePod(pod) return } - if job := jm.getPodJob(pod); job != nil { + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } jobKey, err := controller.KeyFunc(job) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) return } jm.expectations.CreationObserved(jobKey) jm.enqueueController(job) + return + } + + // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, job := range jm.getPodJobs(pod) { + jm.enqueueController(job) } } @@ -197,15 +219,40 @@ func (jm *JobController) updatePod(old, cur interface{}) { jm.deletePod(curPod) return } - if job := jm.getPodJob(curPod); job != nil { - jm.enqueueController(job) + + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) + + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + job, err := jm.jobLister.Jobs(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + jm.enqueueController(job) + } } - // Only need to get the old job if the labels changed. - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - // If the old and new job are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldJob := jm.getPodJob(oldPod); oldJob != nil { - jm.enqueueController(oldJob) + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + job, err := jm.jobLister.Jobs(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + jm.enqueueController(job) + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, job := range jm.getPodJobs(curPod) { + jm.enqueueController(job) } } } @@ -222,24 +269,36 @@ func (jm *JobController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj)) return } } - if job := jm.getPodJob(pod); job != nil { - jobKey, err := controller.KeyFunc(job) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) - return - } - jm.expectations.DeletionObserved(jobKey) - jm.enqueueController(job) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + jobKey, err := controller.KeyFunc(job) + if err != nil { + return + } + jm.expectations.DeletionObserved(jobKey) + jm.enqueueController(job) } // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 226f6bd4b85..594cc40451e 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "strconv" "testing" "time" @@ -598,7 +599,11 @@ func TestJobPodLookup(t *testing.T) { } for _, tc := range testCases { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job) - if job := manager.getPodJob(tc.pod); job != nil { + if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 { + if got, want := len(jobs), 1; got != want { + t.Errorf("len(jobs) = %v, want %v", got, want) + } + job := jobs[0] if tc.expectedName != job.Name { t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) } @@ -720,6 +725,279 @@ func TestGetPodsForJobRelease(t *testing.T) { } } +func TestAddPod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + jm.addPod(pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + jm.addPod(pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestAddPodOrphan(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + job3 := newJob(1, 1) + job3.Name = "job3" + job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) + + pod1 := newPod("pod1", job1) + // Make pod an orphan. Expect all matching controllers to be queued. + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + jm.addPod(pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + prev := *pod1 + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + prev = *pod2 + bumpResourceVersion(pod2) + jm.updatePod(&prev, pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Labels changed on orphan. Expect newly matching controllers to queue. + prev := *pod1 + prev.Labels = map[string]string{"foo2": "bar2"} + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodChangeControllerRef(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Changed ControllerRef. Expect both old and new to queue. + prev := *pod1 + prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(job2)} + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodRelease(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Remove ControllerRef. Expect all matching to queue for adoption. + prev := *pod1 + pod1.OwnerReferences = nil + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestDeletePod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + jm.deletePod(pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + jm.deletePod(pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestDeletePodOrphan(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + job3 := newJob(1, 1) + job3.Name = "job3" + job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) + + pod1 := newPod("pod1", job1) + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + jm.deletePod(pod1) + if got, want := jm.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + type FakeJobExpectations struct { *controller.ControllerExpectations satisfied bool @@ -855,3 +1133,8 @@ func TestWatchPods(t *testing.T) { t.Log("Waiting for pod to reach syncHandler") <-received } + +func bumpResourceVersion(obj metav1.Object) { + ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) + obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) +} From ad026026e0252469bab05f9adba4bb1b20a8f6a9 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sun, 26 Feb 2017 18:28:26 -0800 Subject: [PATCH 05/12] Job: Update Lister documentation for ControllerRef. The Job Listers still use selectors, because this is the behavior expected by callers. This clarifies the meaning of the returned list. Some callers may need to switch to using GetControllerOf() instead, but that is a separate, case-by-case issue. --- .../listers/batch/internalversion/job_expansion.go | 12 ++++++++---- pkg/client/listers/batch/v1/job_expansion.go | 12 ++++++++---- .../client-go/listers/batch/v1/job_expansion.go | 12 ++++++++---- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/client/listers/batch/internalversion/job_expansion.go b/pkg/client/listers/batch/internalversion/job_expansion.go index 0b19638b612..1687f88198f 100644 --- a/pkg/client/listers/batch/internalversion/job_expansion.go +++ b/pkg/client/listers/batch/internalversion/job_expansion.go @@ -28,13 +28,17 @@ import ( // JobListerExpansion allows custom methods to be added to // JobLister. type JobListerExpansion interface { - // GetPodJobs returns a list of jobs managing a pod. An error is returned only - // if no matching jobs are found. + // GetPodJobs returns a list of Jobs that potentially + // match a Pod. Only the one specified in the Pod's ControllerRef + // will actually manage it. + // Returns an error only if no matching Jobs are found. GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) } -// GetPodJobs returns a list of jobs managing a pod. An error is returned only -// if no matching jobs are found. +// GetPodJobs returns a list of Jobs that potentially +// match a Pod. Only the one specified in the Pod's ControllerRef +// will actually manage it. +// Returns an error only if no matching Jobs are found. func (l *jobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) { if len(pod.Labels) == 0 { err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) diff --git a/pkg/client/listers/batch/v1/job_expansion.go b/pkg/client/listers/batch/v1/job_expansion.go index 8915e73415e..deaafd92a02 100644 --- a/pkg/client/listers/batch/v1/job_expansion.go +++ b/pkg/client/listers/batch/v1/job_expansion.go @@ -28,13 +28,17 @@ import ( // JobListerExpansion allows custom methods to be added to // JobLister. type JobListerExpansion interface { - // GetPodJobs returns a list of jobs managing a pod. An error is returned only - // if no matching jobs are found. + // GetPodJobs returns a list of Jobs that potentially + // match a Pod. Only the one specified in the Pod's ControllerRef + // will actually manage it. + // Returns an error only if no matching Jobs are found. GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) } -// GetPodJobs returns a list of jobs managing a pod. An error is returned only -// if no matching jobs are found. +// GetPodJobs returns a list of Jobs that potentially +// match a Pod. Only the one specified in the Pod's ControllerRef +// will actually manage it. +// Returns an error only if no matching Jobs are found. func (l *jobLister) GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) { if len(pod.Labels) == 0 { err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) diff --git a/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go b/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go index 4b602d0aeb3..8c285e8daf5 100644 --- a/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go +++ b/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go @@ -28,13 +28,17 @@ import ( // JobListerExpansion allows custom methods to be added to // JobLister. type JobListerExpansion interface { - // GetPodJobs returns a list of jobs managing a pod. An error is returned only - // if no matching jobs are found. + // GetPodJobs returns a list of Jobs that potentially + // match a Pod. Only the one specified in the Pod's ControllerRef + // will actually manage it. + // Returns an error only if no matching Jobs are found. GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) } -// GetPodJobs returns a list of jobs managing a pod. An error is returned only -// if no matching jobs are found. +// GetPodJobs returns a list of Jobs that potentially +// match a Pod. Only the one specified in the Pod's ControllerRef +// will actually manage it. +// Returns an error only if no matching Jobs are found. func (l *jobLister) GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) { if len(pod.Labels) == 0 { err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) From c82b537bee2d50b8f6c9ca79217ab65a43363a60 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 2 Mar 2017 11:01:05 -0800 Subject: [PATCH 06/12] Job: Always set BlockOwnerDeletion in ControllerRef. --- pkg/controller/job/utils.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/controller/job/utils.go b/pkg/controller/job/utils.go index 47f1cc83d0e..6e8be3d76a4 100644 --- a/pkg/controller/job/utils.go +++ b/pkg/controller/job/utils.go @@ -32,12 +32,14 @@ func IsJobFinished(j *batch.Job) bool { } func newControllerRef(j *batch.Job) *metav1.OwnerReference { + blockOwnerDeletion := true isController := true return &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, - Name: j.Name, - UID: j.UID, - Controller: &isController, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: j.Name, + UID: j.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, } } From e207f6c767ca772ee4e68fa9e24a8333195eb26e Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Fri, 3 Mar 2017 14:13:12 -0800 Subject: [PATCH 07/12] Job: Fix CronJob e2e test for async Job deletion. Now that the default delete option for Job is OrphanDependents, Job deletion is asynchronous. --- test/e2e/cronjob.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/test/e2e/cronjob.go b/test/e2e/cronjob.go index f28fa3b25da..e62661022e9 100644 --- a/test/e2e/cronjob.go +++ b/test/e2e/cronjob.go @@ -124,7 +124,7 @@ var _ = framework.KubeDescribe("CronJob", func() { Expect(err).NotTo(HaveOccurred()) Expect(cronJob.Status.Active).Should(HaveLen(1)) - By("Ensuring exaclty one running job exists by listing jobs explicitly") + By("Ensuring exactly one running job exists by listing jobs explicitly") jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) activeJobs, _ := filterActiveJobs(jobs) @@ -156,7 +156,7 @@ var _ = framework.KubeDescribe("CronJob", func() { Expect(err).NotTo(HaveOccurred()) Expect(cronJob.Status.Active).Should(HaveLen(1)) - By("Ensuring exaclty one running job exists by listing jobs explicitly") + By("Ensuring exactly one running job exists by listing jobs explicitly") jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) activeJobs, _ := filterActiveJobs(jobs) @@ -394,13 +394,15 @@ func waitForJobReplaced(c clientset.Interface, ns, previousJobName string) error if err != nil { return false, err } - if len(jobs.Items) > 1 { + // Ignore Jobs pending deletion, since deletion of Jobs is now asynchronous. + aliveJobs := filterNotDeletedJobs(jobs) + if len(aliveJobs) > 1 { return false, fmt.Errorf("More than one job is running %+v", jobs.Items) - } else if len(jobs.Items) == 0 { + } else if len(aliveJobs) == 0 { framework.Logf("Warning: Found 0 jobs in namespace %v", ns) return false, nil } - return jobs.Items[0].Name != previousJobName, nil + return aliveJobs[0].Name != previousJobName, nil }) } @@ -451,6 +453,19 @@ func checkNoEventWithReason(c clientset.Interface, ns, cronJobName string, reaso return nil } +// filterNotDeletedJobs returns the job list without any jobs that are pending +// deletion. +func filterNotDeletedJobs(jobs *batchv1.JobList) []*batchv1.Job { + var alive []*batchv1.Job + for i := range jobs.Items { + job := &jobs.Items[i] + if job.DeletionTimestamp == nil { + alive = append(alive, job) + } + } + return alive +} + func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) { for i := range jobs.Items { j := jobs.Items[i] From 06536cb3570b21d6dbda135a8e2002515f375ea4 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Mon, 6 Mar 2017 11:10:03 -0800 Subject: [PATCH 08/12] Job: Check that ControllerRef UID matches. --- pkg/controller/job/jobcontroller.go | 52 ++++++++++++++++------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index d759c75b613..1f32676482d 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -162,6 +162,27 @@ func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job { return ret } +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the corrrect Kind. +func (jm *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + job, err := jm.jobLister.Jobs(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if job.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return job +} + // When a pod is created, enqueue the controller that manages it and update it's expectations. func (jm *JobController) addPod(obj interface{}) { pod := obj.(*v1.Pod) @@ -174,12 +195,8 @@ func (jm *JobController) addPod(obj interface{}) { // If it has a ControllerRef, that's all that matters. if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { - if controllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. - return - } - job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name) - if err != nil { + job := jm.resolveControllerRef(pod.Namespace, controllerRef) + if job == nil { return } jobKey, err := controller.KeyFunc(job) @@ -225,23 +242,17 @@ func (jm *JobController) updatePod(old, cur interface{}) { curControllerRef := controller.GetControllerOf(curPod) oldControllerRef := controller.GetControllerOf(oldPod) controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) - if controllerRefChanged && - oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. - job, err := jm.jobLister.Jobs(oldPod.Namespace).Get(oldControllerRef.Name) - if err == nil { + if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { jm.enqueueController(job) } } // If it has a ControllerRef, that's all that matters. if curControllerRef != nil { - if curControllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. - return - } - job, err := jm.jobLister.Jobs(curPod.Namespace).Get(curControllerRef.Name) - if err != nil { + job := jm.resolveControllerRef(curPod.Namespace, curControllerRef) + if job == nil { return } jm.enqueueController(job) @@ -284,13 +295,8 @@ func (jm *JobController) deletePod(obj interface{}) { // No controller should care about orphans being deleted. return } - if controllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. - return - } - - job, err := jm.jobLister.Jobs(pod.Namespace).Get(controllerRef.Name) - if err != nil { + job := jm.resolveControllerRef(pod.Namespace, controllerRef) + if job == nil { return } jobKey, err := controller.KeyFunc(job) From fd8dd26d09456748b6b62ae1cae9991830d9d673 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Mon, 6 Mar 2017 11:12:23 -0800 Subject: [PATCH 09/12] Job: Check ControllerRef Name and UID in unit test. --- pkg/controller/job/jobcontroller_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 594cc40451e..0376668f881 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -291,6 +291,12 @@ func TestControllerSyncJob(t *testing.T) { if got, want := controllerRef.Kind, "Job"; got != want { t.Errorf("controllerRef.Kind = %q, want %q", got, want) } + if got, want := controllerRef.Name, job.Name; got != want { + t.Errorf("controllerRef.Name = %q, want %q", got, want) + } + if got, want := controllerRef.UID, job.UID; got != want { + t.Errorf("controllerRef.UID = %q, want %q", got, want) + } if controllerRef.Controller == nil || *controllerRef.Controller != true { t.Errorf("controllerRef.Controller is not set to true") } From f502ab6a8b9621c5a8d01f3ed7f1a0af03406eca Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 12 Apr 2017 15:21:27 -0700 Subject: [PATCH 10/12] Job: Recheck DeletionTimestamp before adoption. This is necessary to avoid racing with the GC when it orphans dependents. --- pkg/controller/job/jobcontroller.go | 14 ++++- pkg/controller/job/jobcontroller_test.go | 70 +++++++++++++++++++++++- 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 1f32676482d..d818f0e85b4 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -364,7 +364,19 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { if err != nil { return nil, err } - cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind) + // If any adoptions are attempted, we should first recheck for deletion + // with an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(j.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != j.UID { + return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID) + } + return fresh, nil + }) + cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc) return cm.ClaimPods(pods) } diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 0376668f881..0eadda6f14c 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -677,13 +677,13 @@ func TestGetPodsForJob(t *testing.T) { } func TestGetPodsForJobAdopt(t *testing.T) { - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + job1 := newJob(1, 1) + job1.Name = "job1" + clientset := fake.NewSimpleClientset(job1) jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) - job1.Name = "job1" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) pod1 := newPod("pod1", job1) @@ -702,6 +702,70 @@ func TestGetPodsForJobAdopt(t *testing.T) { } } +func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { + job1 := newJob(1, 1) + job1.Name = "job1" + job1.DeletionTimestamp = &metav1.Time{} + clientset := fake.NewSimpleClientset(job1) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should not be adopted because the Job is being deleted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, pod1.Name; got != want { + t.Errorf("pod.Name = %q, want %q", got, want) + } +} + +func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) { + job1 := newJob(1, 1) + job1.Name = "job1" + // The up-to-date object says it's being deleted. + job1.DeletionTimestamp = &metav1.Time{} + clientset := fake.NewSimpleClientset(job1) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + // The cache says it's NOT being deleted. + cachedJob := *job1 + cachedJob.DeletionTimestamp = nil + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(&cachedJob) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should not be adopted because the Job is being deleted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, pod1.Name; got != want { + t.Errorf("pod.Name = %q, want %q", got, want) + } +} + func TestGetPodsForJobRelease(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) From aa7bc258124a4b1612855c102f38eebe5bd4a169 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 12 Apr 2017 15:42:09 -0700 Subject: [PATCH 11/12] Job: Add PATCH Pods permission. This is needed to update ControllerRef during adopt/release. --- .../auth/authorizer/rbac/bootstrappolicy/controller_policy.go | 2 +- .../rbac/bootstrappolicy/testdata/controller-roles.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index ee19297dd33..b2b0cbd682e 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -152,7 +152,7 @@ func init() { Rules: []rbac.PolicyRule{ rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("jobs").RuleOrDie(), rbac.NewRule("update").Groups(batchGroup).Resources("jobs/status").RuleOrDie(), - rbac.NewRule("list", "watch", "create", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbac.NewRule("list", "watch", "create", "delete", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), eventsRule(), }, }) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index e331d7d3f55..3ae6c71881c 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -492,6 +492,7 @@ items: - create - delete - list + - patch - watch - apiGroups: - "" From d5b86bbae464dc916daa4714d00163556ef5dfd6 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 12 Apr 2017 15:57:38 -0700 Subject: [PATCH 12/12] Job: Add e2e test for Pod adopt/release. An e2e test is currently the only way to ensure we have the correct RBAC permissions to PATCH Pods. --- test/e2e/framework/jobs_util.go | 14 ++++++-- test/e2e/job.go | 57 +++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/test/e2e/framework/jobs_util.go b/test/e2e/framework/jobs_util.go index 0862e615284..68f4b8b4068 100644 --- a/test/e2e/framework/jobs_util.go +++ b/test/e2e/framework/jobs_util.go @@ -46,6 +46,9 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl ObjectMeta: metav1.ObjectMeta{ Name: name, }, + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + }, Spec: batch.JobSpec{ Parallelism: ¶llelism, Completions: &completions, @@ -126,13 +129,18 @@ func DeleteJob(c clientset.Interface, ns, name string) error { return c.Batch().Jobs(ns).Delete(name, nil) } +// GetJobPods returns a list of Pods belonging to a Job. +func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) { + label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName})) + options := metav1.ListOptions{LabelSelector: label.String()} + return c.CoreV1().Pods(ns).List(options) +} + // WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use // when pods will run for a long time, or it will be racy. func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error { - label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName})) return wait.Poll(Poll, JobTimeout, func() (bool, error) { - options := metav1.ListOptions{LabelSelector: label.String()} - pods, err := c.Core().Pods(ns).List(options) + pods, err := GetJobPods(c, ns, jobName) if err != nil { return false, err } diff --git a/test/e2e/job.go b/test/e2e/job.go index 12bf5f09f60..08212bf42d4 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -26,8 +26,11 @@ import ( "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/test/e2e/framework" + "fmt" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/controller" ) var _ = framework.KubeDescribe("Job", func() { @@ -106,4 +109,58 @@ var _ = framework.KubeDescribe("Job", func() { Expect(err).To(HaveOccurred()) Expect(errors.IsNotFound(err)).To(BeTrue()) }) + + It("should adopt matching orphans and release non-matching pods", func() { + By("Creating a job") + job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions) + // Replace job with the one returned from Create() so it has the UID. + // Save Kind since it won't be populated in the returned job. + kind := job.Kind + job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + job.Kind = kind + + By("Ensuring active pods == parallelism") + err = framework.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + + By("Orphaning one of the Job's Pods") + pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(int(parallelism))) + pod := pods.Items[0] + f.PodClient().Update(pod.Name, func(pod *v1.Pod) { + pod.OwnerReferences = nil + }) + + By("Checking that the Job readopts the Pod") + Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", framework.JobTimeout, + func(pod *v1.Pod) (bool, error) { + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + return false, nil + } + if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID { + return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job) + } + return true, nil + }, + )).To(Succeed(), "wait for pod %q to be readopted", pod.Name) + + By("Removing the labels from the Job's Pod") + f.PodClient().Update(pod.Name, func(pod *v1.Pod) { + pod.Labels = nil + }) + + By("Checking that the Job releases the Pod") + Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", framework.JobTimeout, + func(pod *v1.Pod) (bool, error) { + controllerRef := controller.GetControllerOf(pod) + if controllerRef != nil { + return false, nil + } + return true, nil + }, + )).To(Succeed(), "wait for pod %q to be released", pod.Name) + }) })