diff --git a/pkg/client/listers/batch/internalversion/job_expansion.go b/pkg/client/listers/batch/internalversion/job_expansion.go index 0b19638b612..1687f88198f 100644 --- a/pkg/client/listers/batch/internalversion/job_expansion.go +++ b/pkg/client/listers/batch/internalversion/job_expansion.go @@ -28,13 +28,17 @@ import ( // JobListerExpansion allows custom methods to be added to // JobLister. type JobListerExpansion interface { - // GetPodJobs returns a list of jobs managing a pod. An error is returned only - // if no matching jobs are found. + // GetPodJobs returns a list of Jobs that potentially + // match a Pod. Only the one specified in the Pod's ControllerRef + // will actually manage it. + // Returns an error only if no matching Jobs are found. GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) } -// GetPodJobs returns a list of jobs managing a pod. An error is returned only -// if no matching jobs are found. +// GetPodJobs returns a list of Jobs that potentially +// match a Pod. Only the one specified in the Pod's ControllerRef +// will actually manage it. +// Returns an error only if no matching Jobs are found. func (l *jobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) { if len(pod.Labels) == 0 { err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) diff --git a/pkg/client/listers/batch/v1/job_expansion.go b/pkg/client/listers/batch/v1/job_expansion.go index 8915e73415e..deaafd92a02 100644 --- a/pkg/client/listers/batch/v1/job_expansion.go +++ b/pkg/client/listers/batch/v1/job_expansion.go @@ -28,13 +28,17 @@ import ( // JobListerExpansion allows custom methods to be added to // JobLister. type JobListerExpansion interface { - // GetPodJobs returns a list of jobs managing a pod. An error is returned only - // if no matching jobs are found. + // GetPodJobs returns a list of Jobs that potentially + // match a Pod. Only the one specified in the Pod's ControllerRef + // will actually manage it. + // Returns an error only if no matching Jobs are found. GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) } -// GetPodJobs returns a list of jobs managing a pod. An error is returned only -// if no matching jobs are found. +// GetPodJobs returns a list of Jobs that potentially +// match a Pod. Only the one specified in the Pod's ControllerRef +// will actually manage it. +// Returns an error only if no matching Jobs are found. func (l *jobLister) GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) { if len(pod.Labels) == 0 { err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) diff --git a/pkg/controller/job/BUILD b/pkg/controller/job/BUILD index 4d535551f3e..e37de5221ba 100644 --- a/pkg/controller/job/BUILD +++ b/pkg/controller/job/BUILD @@ -30,6 +30,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", @@ -59,6 +60,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index ade0e584088..d818f0e85b4 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -46,6 +47,9 @@ import ( "github.com/golang/glog" ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = batch.SchemeGroupVersion.WithKind("Job") + type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface @@ -140,18 +144,43 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { <-stopCh } -// getPodJob returns the job managing the given pod. -func (jm *JobController) getPodJob(pod *v1.Pod) *batch.Job { +// getPodJobs returns a list of Jobs that potentially match a Pod. +func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job { jobs, err := jm.jobLister.GetPodJobs(pod) if err != nil { - glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) return nil } if len(jobs) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(byCreationTimestamp(jobs)) } - return &jobs[0] + ret := make([]*batch.Job, 0, len(jobs)) + for i := range jobs { + ret = append(ret, &jobs[i]) + } + return ret +} + +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the corrrect Kind. +func (jm *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + job, err := jm.jobLister.Jobs(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if job.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return job } // When a pod is created, enqueue the controller that manages it and update it's expectations. @@ -163,14 +192,28 @@ func (jm *JobController) addPod(obj interface{}) { jm.deletePod(pod) return } - if job := jm.getPodJob(pod); job != nil { + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + job := jm.resolveControllerRef(pod.Namespace, controllerRef) + if job == nil { + return + } jobKey, err := controller.KeyFunc(job) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) return } jm.expectations.CreationObserved(jobKey) jm.enqueueController(job) + return + } + + // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, job := range jm.getPodJobs(pod) { + jm.enqueueController(job) } } @@ -193,15 +236,34 @@ func (jm *JobController) updatePod(old, cur interface{}) { jm.deletePod(curPod) return } - if job := jm.getPodJob(curPod); job != nil { - jm.enqueueController(job) + + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) + + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { + jm.enqueueController(job) + } } - // Only need to get the old job if the labels changed. - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - // If the old and new job are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldJob := jm.getPodJob(oldPod); oldJob != nil { - jm.enqueueController(oldJob) + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + job := jm.resolveControllerRef(curPod.Namespace, curControllerRef) + if job == nil { + return + } + jm.enqueueController(job) + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, job := range jm.getPodJobs(curPod) { + jm.enqueueController(job) } } } @@ -218,24 +280,31 @@ func (jm *JobController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj)) return } } - if job := jm.getPodJob(pod); job != nil { - jobKey, err := controller.KeyFunc(job) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) - return - } - jm.expectations.DeletionObserved(jobKey) - jm.enqueueController(job) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + job := jm.resolveControllerRef(pod.Namespace, controllerRef) + if job == nil { + return + } + jobKey, err := controller.KeyFunc(job) + if err != nil { + return + } + jm.expectations.DeletionObserved(jobKey) + jm.enqueueController(job) } // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. @@ -281,6 +350,36 @@ func (jm *JobController) processNextWorkItem() bool { return true } +// getPodsForJob returns the set of pods that this Job should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// Note that the returned Pods are pointers into the cache. +func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { + selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("couldn't convert Job selector: %v", err) + } + // List all pods to include those that don't match the selector anymore + // but have a ControllerRef pointing to this controller. + pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + // If any adoptions are attempted, we should first recheck for deletion + // with an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(j.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != j.UID { + return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID) + } + return fresh, nil + }) + cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc) + return cm.ClaimPods(pods) +} + // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. @@ -312,8 +411,8 @@ func (jm *JobController) syncJob(key string) error { // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. jobNeedsSync := jm.expectations.SatisfiedExpectations(key) - selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector) - pods, err := jm.podStore.Pods(job.Namespace).List(selector) + + pods, err := jm.getPodsForJob(&job) if err != nil { return err } @@ -507,7 +606,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b for i := int32(0); i < diff; i++ { go func() { defer wait.Done() - if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil { + if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod jm.expectations.CreationObserved(jobKey) diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 4d3cb6116ce..0eadda6f14c 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -18,12 +18,14 @@ package job import ( "fmt" + "strconv" "testing" "time" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" @@ -44,6 +46,7 @@ func newJob(parallelism, completions int32) *batch.Job { j := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", + UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, }, Spec: batch.JobSpec{ @@ -91,6 +94,7 @@ func getKey(job *batch.Job, t *testing.T) string { func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) { sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient) + jm.podControl = &controller.FakePodControl{} return jm, sharedInformers } @@ -101,9 +105,10 @@ func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod { for i := int32(0); i < count; i++ { newPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%v", rand.String(10)), - Labels: job.Spec.Selector.MatchLabels, - Namespace: job.Namespace, + Name: fmt.Sprintf("pod-%v", rand.String(10)), + Labels: job.Spec.Selector.MatchLabels, + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)}, }, Status: v1.PodStatus{Phase: status}, } @@ -274,6 +279,28 @@ func TestControllerSyncJob(t *testing.T) { if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName)) } + // Each create should have an accompanying ControllerRef. + if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) { + t.Errorf("%s: unexpected number of ControllerRefs. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.ControllerRefs)) + } + // Make sure the ControllerRefs are correct. + for _, controllerRef := range fakePodControl.ControllerRefs { + if got, want := controllerRef.APIVersion, "batch/v1"; got != want { + t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) + } + if got, want := controllerRef.Kind, "Job"; got != want { + t.Errorf("controllerRef.Kind = %q, want %q", got, want) + } + if got, want := controllerRef.Name, job.Name; got != want { + t.Errorf("controllerRef.Name = %q, want %q", got, want) + } + if got, want := controllerRef.UID, job.UID; got != want { + t.Errorf("controllerRef.UID = %q, want %q", got, want) + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("controllerRef.Controller is not set to true") + } + } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active) @@ -578,7 +605,11 @@ func TestJobPodLookup(t *testing.T) { } for _, tc := range testCases { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job) - if job := manager.getPodJob(tc.pod); job != nil { + if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 { + if got, want := len(jobs), 1; got != want { + t.Errorf("len(jobs) = %v, want %v", got, want) + } + job := jobs[0] if tc.expectedName != job.Name { t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) } @@ -588,6 +619,455 @@ func TestJobPodLookup(t *testing.T) { } } +func newPod(name string, job *batch.Job) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: job.Spec.Selector.MatchLabels, + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)}, + }, + } +} + +func TestGetPodsForJob(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + pod3 := newPod("pod3", job1) + // Make pod3 an orphan that doesn't match. It should be ignored. + pod3.OwnerReferences = nil + pod3.Labels = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod3) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod1"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } + + pods, err = jm.getPodsForJob(job2) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod2"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } +} + +func TestGetPodsForJobAdopt(t *testing.T) { + job1 := newJob(1, 1) + job1.Name = "job1" + clientset := fake.NewSimpleClientset(job1) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should still be returned because it's adopted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 2; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } +} + +func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { + job1 := newJob(1, 1) + job1.Name = "job1" + job1.DeletionTimestamp = &metav1.Time{} + clientset := fake.NewSimpleClientset(job1) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should not be adopted because the Job is being deleted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, pod1.Name; got != want { + t.Errorf("pod.Name = %q, want %q", got, want) + } +} + +func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) { + job1 := newJob(1, 1) + job1.Name = "job1" + // The up-to-date object says it's being deleted. + job1.DeletionTimestamp = &metav1.Time{} + clientset := fake.NewSimpleClientset(job1) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + // The cache says it's NOT being deleted. + cachedJob := *job1 + cachedJob.DeletionTimestamp = nil + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(&cachedJob) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should not be adopted because the Job is being deleted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, pod1.Name; got != want { + t.Errorf("pod.Name = %q, want %q", got, want) + } +} + +func TestGetPodsForJobRelease(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod not match, even though it's owned. It should be released. + pod2.Labels = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod1"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } +} + +func TestAddPod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + jm.addPod(pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + jm.addPod(pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestAddPodOrphan(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + job3 := newJob(1, 1) + job3.Name = "job3" + job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) + + pod1 := newPod("pod1", job1) + // Make pod an orphan. Expect all matching controllers to be queued. + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + jm.addPod(pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + prev := *pod1 + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + prev = *pod2 + bumpResourceVersion(pod2) + jm.updatePod(&prev, pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Labels changed on orphan. Expect newly matching controllers to queue. + prev := *pod1 + prev.Labels = map[string]string{"foo2": "bar2"} + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodChangeControllerRef(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Changed ControllerRef. Expect both old and new to queue. + prev := *pod1 + prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(job2)} + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodRelease(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + // Remove ControllerRef. Expect all matching to queue for adoption. + prev := *pod1 + pod1.OwnerReferences = nil + bumpResourceVersion(pod1) + jm.updatePod(&prev, pod1) + if got, want := jm.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestDeletePod(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + jm.deletePod(pod1) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(job1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + jm.deletePod(pod2) + if got, want := jm.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = jm.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(job2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestDeletePodOrphan(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + job3 := newJob(1, 1) + job3.Name = "job3" + job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) + + pod1 := newPod("pod1", job1) + pod1.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + + jm.deletePod(pod1) + if got, want := jm.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + type FakeJobExpectations struct { *controller.ControllerExpectations satisfied bool @@ -723,3 +1203,8 @@ func TestWatchPods(t *testing.T) { t.Log("Waiting for pod to reach syncHandler") <-received } + +func bumpResourceVersion(obj metav1.Object) { + ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) + obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) +} diff --git a/pkg/controller/job/utils.go b/pkg/controller/job/utils.go index 5d14f054c60..6e8be3d76a4 100644 --- a/pkg/controller/job/utils.go +++ b/pkg/controller/job/utils.go @@ -17,6 +17,7 @@ limitations under the License. package job import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api/v1" batch "k8s.io/kubernetes/pkg/apis/batch/v1" ) @@ -29,3 +30,16 @@ func IsJobFinished(j *batch.Job) bool { } return false } + +func newControllerRef(j *batch.Job) *metav1.OwnerReference { + blockOwnerDeletion := true + isController := true + return &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: j.Name, + UID: j.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, + } +} diff --git a/pkg/registry/batch/job/BUILD b/pkg/registry/batch/job/BUILD index 623a34d697f..c4a356516e1 100644 --- a/pkg/registry/batch/job/BUILD +++ b/pkg/registry/batch/job/BUILD @@ -26,6 +26,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library", ], @@ -44,6 +45,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", ], ) diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index 4a184a9f69a..4bfdb3213f8 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/api" @@ -43,6 +44,12 @@ type jobStrategy struct { // Strategy is the default logic that applies when creating and updating Replication Controller objects. var Strategy = jobStrategy{api.Scheme, names.SimpleNameGenerator} +// DefaultGarbageCollectionPolicy returns Orphan because that was the default +// behavior before the server-side garbage collection was implemented. +func (jobStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy { + return rest.OrphanDependents +} + // NamespaceScoped returns true because all jobs need to be within a namespace. func (jobStrategy) NamespaceScoped() bool { return true diff --git a/pkg/registry/batch/job/strategy_test.go b/pkg/registry/batch/job/strategy_test.go index 590afa0e98f..1586c4994d1 100644 --- a/pkg/registry/batch/job/strategy_test.go +++ b/pkg/registry/batch/job/strategy_test.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" @@ -100,6 +101,13 @@ func TestJobStrategy(t *testing.T) { if len(errs) == 0 { t.Errorf("Expected a validation error") } + + // Make sure we correctly implement the interface. + // Otherwise a typo could silently change the default. + var gcds rest.GarbageCollectionDeleteStrategy = Strategy + if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want { + t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want) + } } func TestJobStrategyWithGeneration(t *testing.T) { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 3492336aa02..ddf83429177 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -152,7 +152,7 @@ func init() { Rules: []rbac.PolicyRule{ rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("jobs").RuleOrDie(), rbac.NewRule("update").Groups(batchGroup).Resources("jobs/status").RuleOrDie(), - rbac.NewRule("list", "watch", "create", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbac.NewRule("list", "watch", "create", "delete", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), eventsRule(), }, }) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index a0e420faac1..9a9e031bdd0 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -493,6 +493,7 @@ items: - create - delete - list + - patch - watch - apiGroups: - "" diff --git a/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go b/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go index 4b602d0aeb3..8c285e8daf5 100644 --- a/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go +++ b/staging/src/k8s.io/client-go/listers/batch/v1/job_expansion.go @@ -28,13 +28,17 @@ import ( // JobListerExpansion allows custom methods to be added to // JobLister. type JobListerExpansion interface { - // GetPodJobs returns a list of jobs managing a pod. An error is returned only - // if no matching jobs are found. + // GetPodJobs returns a list of Jobs that potentially + // match a Pod. Only the one specified in the Pod's ControllerRef + // will actually manage it. + // Returns an error only if no matching Jobs are found. GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) } -// GetPodJobs returns a list of jobs managing a pod. An error is returned only -// if no matching jobs are found. +// GetPodJobs returns a list of Jobs that potentially +// match a Pod. Only the one specified in the Pod's ControllerRef +// will actually manage it. +// Returns an error only if no matching Jobs are found. func (l *jobLister) GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) { if len(pod.Labels) == 0 { err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) diff --git a/test/e2e/cronjob.go b/test/e2e/cronjob.go index d6cadef3a71..8afbcecb6d1 100644 --- a/test/e2e/cronjob.go +++ b/test/e2e/cronjob.go @@ -125,7 +125,7 @@ var _ = framework.KubeDescribe("CronJob", func() { Expect(err).NotTo(HaveOccurred()) Expect(cronJob.Status.Active).Should(HaveLen(1)) - By("Ensuring exaclty one running job exists by listing jobs explicitly") + By("Ensuring exactly one running job exists by listing jobs explicitly") jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) activeJobs, _ := filterActiveJobs(jobs) @@ -157,7 +157,7 @@ var _ = framework.KubeDescribe("CronJob", func() { Expect(err).NotTo(HaveOccurred()) Expect(cronJob.Status.Active).Should(HaveLen(1)) - By("Ensuring exaclty one running job exists by listing jobs explicitly") + By("Ensuring exactly one running job exists by listing jobs explicitly") jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) activeJobs, _ := filterActiveJobs(jobs) @@ -445,13 +445,15 @@ func waitForJobReplaced(c clientset.Interface, ns, previousJobName string) error if err != nil { return false, err } - if len(jobs.Items) > 1 { + // Ignore Jobs pending deletion, since deletion of Jobs is now asynchronous. + aliveJobs := filterNotDeletedJobs(jobs) + if len(aliveJobs) > 1 { return false, fmt.Errorf("More than one job is running %+v", jobs.Items) - } else if len(jobs.Items) == 0 { + } else if len(aliveJobs) == 0 { framework.Logf("Warning: Found 0 jobs in namespace %v", ns) return false, nil } - return jobs.Items[0].Name != previousJobName, nil + return aliveJobs[0].Name != previousJobName, nil }) } @@ -502,6 +504,19 @@ func checkNoEventWithReason(c clientset.Interface, ns, cronJobName string, reaso return nil } +// filterNotDeletedJobs returns the job list without any jobs that are pending +// deletion. +func filterNotDeletedJobs(jobs *batchv1.JobList) []*batchv1.Job { + var alive []*batchv1.Job + for i := range jobs.Items { + job := &jobs.Items[i] + if job.DeletionTimestamp == nil { + alive = append(alive, job) + } + } + return alive +} + func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) { for i := range jobs.Items { j := jobs.Items[i] diff --git a/test/e2e/framework/jobs_util.go b/test/e2e/framework/jobs_util.go index e333f8ebefb..084b583d100 100644 --- a/test/e2e/framework/jobs_util.go +++ b/test/e2e/framework/jobs_util.go @@ -48,6 +48,9 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl ObjectMeta: metav1.ObjectMeta{ Name: name, }, + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + }, Spec: batch.JobSpec{ Parallelism: ¶llelism, Completions: &completions, @@ -151,13 +154,18 @@ func DeleteJob(c clientset.Interface, ns, name string) error { return c.Batch().Jobs(ns).Delete(name, nil) } +// GetJobPods returns a list of Pods belonging to a Job. +func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) { + label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName})) + options := metav1.ListOptions{LabelSelector: label.String()} + return c.CoreV1().Pods(ns).List(options) +} + // WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use // when pods will run for a long time, or it will be racy. func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error { - label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName})) return wait.Poll(Poll, JobTimeout, func() (bool, error) { - options := metav1.ListOptions{LabelSelector: label.String()} - pods, err := c.Core().Pods(ns).List(options) + pods, err := GetJobPods(c, ns, jobName) if err != nil { return false, err } diff --git a/test/e2e/job.go b/test/e2e/job.go index 12bf5f09f60..08212bf42d4 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -26,8 +26,11 @@ import ( "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/test/e2e/framework" + "fmt" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/controller" ) var _ = framework.KubeDescribe("Job", func() { @@ -106,4 +109,58 @@ var _ = framework.KubeDescribe("Job", func() { Expect(err).To(HaveOccurred()) Expect(errors.IsNotFound(err)).To(BeTrue()) }) + + It("should adopt matching orphans and release non-matching pods", func() { + By("Creating a job") + job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions) + // Replace job with the one returned from Create() so it has the UID. + // Save Kind since it won't be populated in the returned job. + kind := job.Kind + job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + job.Kind = kind + + By("Ensuring active pods == parallelism") + err = framework.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + + By("Orphaning one of the Job's Pods") + pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(int(parallelism))) + pod := pods.Items[0] + f.PodClient().Update(pod.Name, func(pod *v1.Pod) { + pod.OwnerReferences = nil + }) + + By("Checking that the Job readopts the Pod") + Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", framework.JobTimeout, + func(pod *v1.Pod) (bool, error) { + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + return false, nil + } + if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID { + return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job) + } + return true, nil + }, + )).To(Succeed(), "wait for pod %q to be readopted", pod.Name) + + By("Removing the labels from the Job's Pod") + f.PodClient().Update(pod.Name, func(pod *v1.Pod) { + pod.Labels = nil + }) + + By("Checking that the Job releases the Pod") + Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", framework.JobTimeout, + func(pod *v1.Pod) (bool, error) { + controllerRef := controller.GetControllerOf(pod) + if controllerRef != nil { + return false, nil + } + return true, nil + }, + )).To(Succeed(), "wait for pod %q to be released", pod.Name) + }) })