diff --git a/pkg/controller/job/BUILD b/pkg/controller/job/BUILD index 4d535551f3e..e37de5221ba 100644 --- a/pkg/controller/job/BUILD +++ b/pkg/controller/job/BUILD @@ -30,6 +30,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", @@ -59,6 +60,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 9c55e45cbb6..7ed75c68900 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -284,6 +285,24 @@ func (jm *JobController) processNextWorkItem() bool { return true } +// getPodsForJob returns the set of pods that this Job should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// Note that the returned Pods are pointers into the cache. +func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { + selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("couldn't convert Job selector: %v", err) + } + // List all pods to include those that don't match the selector anymore + // but have a ControllerRef pointing to this controller. + pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind) + return cm.ClaimPods(pods) +} + // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. @@ -315,8 +334,8 @@ func (jm *JobController) syncJob(key string) error { // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. jobNeedsSync := jm.expectations.SatisfiedExpectations(key) - selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector) - pods, err := jm.podStore.Pods(job.Namespace).List(selector) + + pods, err := jm.getPodsForJob(&job) if err != nil { return err } diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index e4f1ed358e0..226f6bd4b85 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -24,6 +24,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" @@ -44,6 +45,7 @@ func newJob(parallelism, completions int32) *batch.Job { j := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", + UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, }, Spec: batch.JobSpec{ @@ -91,6 +93,7 @@ func getKey(job *batch.Job, t *testing.T) string { func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) { sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient) + jm.podControl = &controller.FakePodControl{} return jm, sharedInformers } @@ -605,6 +608,118 @@ func TestJobPodLookup(t *testing.T) { } } +func newPod(name string, job *batch.Job) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: job.Spec.Selector.MatchLabels, + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)}, + }, + } +} + +func TestGetPodsForJob(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + job2 := newJob(1, 1) + job2.Name = "job2" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job2) + pod3 := newPod("pod3", job1) + // Make pod3 an orphan that doesn't match. It should be ignored. + pod3.OwnerReferences = nil + pod3.Labels = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod3) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod1"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } + + pods, err = jm.getPodsForJob(job2) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod2"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } +} + +func TestGetPodsForJobAdopt(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should still be returned because it's adopted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 2; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } +} + +func TestGetPodsForJobRelease(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + job1 := newJob(1, 1) + job1.Name = "job1" + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod not match, even though it's owned. It should be released. + pod2.Labels = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, "pod1"; got != want { + t.Errorf("pod.Name = %v, want %v", got, want) + } +} + type FakeJobExpectations struct { *controller.ControllerExpectations satisfied bool