From f502ab6a8b9621c5a8d01f3ed7f1a0af03406eca Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 12 Apr 2017 15:21:27 -0700 Subject: [PATCH] Job: Recheck DeletionTimestamp before adoption. This is necessary to avoid racing with the GC when it orphans dependents. --- pkg/controller/job/jobcontroller.go | 14 ++++- pkg/controller/job/jobcontroller_test.go | 70 +++++++++++++++++++++++- 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 1f32676482d..d818f0e85b4 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -364,7 +364,19 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { if err != nil { return nil, err } - cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind) + // If any adoptions are attempted, we should first recheck for deletion + // with an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(j.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != j.UID { + return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID) + } + return fresh, nil + }) + cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc) return cm.ClaimPods(pods) } diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 0376668f881..0eadda6f14c 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -677,13 +677,13 @@ func TestGetPodsForJob(t *testing.T) { } func TestGetPodsForJobAdopt(t *testing.T) { - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + job1 := newJob(1, 1) + job1.Name = "job1" + clientset := fake.NewSimpleClientset(job1) jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) - job1.Name = "job1" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) pod1 := newPod("pod1", job1) @@ -702,6 +702,70 @@ func TestGetPodsForJobAdopt(t *testing.T) { } } +func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { + job1 := newJob(1, 1) + job1.Name = "job1" + job1.DeletionTimestamp = &metav1.Time{} + clientset := fake.NewSimpleClientset(job1) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should not be adopted because the Job is being deleted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, pod1.Name; got != want { + t.Errorf("pod.Name = %q, want %q", got, want) + } +} + +func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) { + job1 := newJob(1, 1) + job1.Name = "job1" + // The up-to-date object says it's being deleted. + job1.DeletionTimestamp = &metav1.Time{} + clientset := fake.NewSimpleClientset(job1) + jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + + // The cache says it's NOT being deleted. + cachedJob := *job1 + cachedJob.DeletionTimestamp = nil + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(&cachedJob) + + pod1 := newPod("pod1", job1) + pod2 := newPod("pod2", job1) + // Make this pod an orphan. It should not be adopted because the Job is being deleted. + pod2.OwnerReferences = nil + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) + informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) + + pods, err := jm.getPodsForJob(job1) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + if got, want := len(pods), 1; got != want { + t.Errorf("len(pods) = %v, want %v", got, want) + } + if got, want := pods[0].Name, pod1.Name; got != want { + t.Errorf("pod.Name = %q, want %q", got, want) + } +} + func TestGetPodsForJobRelease(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)