Merge pull request #42176 from enisoc/controller-ref-job

Automatic merge from submit-queue (batch tested with PRs 42177, 42176, 44721)

Job: Respect ControllerRef

**What this PR does / why we need it**:

This is part of the completion of the [ControllerRef](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md) proposal. It brings Job into full compliance with ControllerRef. See the individual commit messages for details.

**Which issue this PR fixes**:

This ensures that Job does not fight with other controllers over control of Pods.

Ref: #24433

**Special notes for your reviewer**:

**Release note**:

```release-note
Job controller now respects ControllerRef to avoid fighting over Pods.
```
cc @erictune @kubernetes/sig-apps-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-04-20 12:57:06 -07:00 committed by GitHub
commit 7b43f922aa
15 changed files with 763 additions and 53 deletions

View File

@ -28,13 +28,17 @@ import (
// JobListerExpansion allows custom methods to be added to
// JobLister.
type JobListerExpansion interface {
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error)
}
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
func (l *jobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)

View File

@ -28,13 +28,17 @@ import (
// JobListerExpansion allows custom methods to be added to
// JobLister.
type JobListerExpansion interface {
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error)
}
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
func (l *jobLister) GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)

View File

@ -30,6 +30,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
@ -59,6 +60,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -46,6 +47,9 @@ import (
"github.com/golang/glog"
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
type JobController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
@ -140,18 +144,43 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
<-stopCh
}
// getPodJob returns the job managing the given pod.
func (jm *JobController) getPodJob(pod *v1.Pod) *batch.Job {
// getPodJobs returns a list of Jobs that potentially match a Pod.
func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job {
jobs, err := jm.jobLister.GetPodJobs(pod)
if err != nil {
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
return nil
}
if len(jobs) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
sort.Sort(byCreationTimestamp(jobs))
}
return &jobs[0]
ret := make([]*batch.Job, 0, len(jobs))
for i := range jobs {
ret = append(ret, &jobs[i])
}
return ret
}
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the corrrect Kind.
func (jm *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
job, err := jm.jobLister.Jobs(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if job.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return job
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
@ -163,14 +192,28 @@ func (jm *JobController) addPod(obj interface{}) {
jm.deletePod(pod)
return
}
if job := jm.getPodJob(pod); job != nil {
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
if job == nil {
return
}
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
return
}
// Otherwise, it's an orphan. Get a list of all matching controllers and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job)
}
}
@ -193,15 +236,34 @@ func (jm *JobController) updatePod(old, cur interface{}) {
jm.deletePod(curPod)
return
}
if job := jm.getPodJob(curPod); job != nil {
jm.enqueueController(job)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
jm.enqueueController(job)
}
}
// Only need to get the old job if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
// If the old and new job are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldJob := jm.getPodJob(oldPod); oldJob != nil {
jm.enqueueController(oldJob)
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
if job == nil {
return
}
jm.enqueueController(job)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job)
}
}
}
@ -218,24 +280,31 @@ func (jm *JobController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj))
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj))
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
return
}
}
if job := jm.getPodJob(pod); job != nil {
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
if job == nil {
return
}
jobKey, err := controller.KeyFunc(job)
if err != nil {
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
}
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
@ -281,6 +350,36 @@ func (jm *JobController) processNextWorkItem() bool {
return true
}
// getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned Pods are pointers into the cache.
func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all pods to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != j.UID {
return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc)
return cm.ClaimPods(pods)
}
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
@ -312,8 +411,8 @@ func (jm *JobController) syncJob(key string) error {
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
pods, err := jm.podStore.Pods(job.Namespace).List(selector)
pods, err := jm.getPodsForJob(&job)
if err != nil {
return err
}
@ -507,7 +606,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
for i := int32(0); i < diff; i++ {
go func() {
defer wait.Done()
if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil {
if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
jm.expectations.CreationObserved(jobKey)

View File

@ -18,12 +18,14 @@ package job
import (
"fmt"
"strconv"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
@ -44,6 +46,7 @@ func newJob(parallelism, completions int32) *batch.Job {
j := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
UID: uuid.NewUUID(),
Namespace: metav1.NamespaceDefault,
},
Spec: batch.JobSpec{
@ -91,6 +94,7 @@ func getKey(job *batch.Job, t *testing.T) string {
func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) {
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient)
jm.podControl = &controller.FakePodControl{}
return jm, sharedInformers
}
@ -101,9 +105,10 @@ func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
for i := int32(0); i < count; i++ {
newPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%v", rand.String(10)),
Labels: job.Spec.Selector.MatchLabels,
Namespace: job.Namespace,
Name: fmt.Sprintf("pod-%v", rand.String(10)),
Labels: job.Spec.Selector.MatchLabels,
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)},
},
Status: v1.PodStatus{Phase: status},
}
@ -274,6 +279,28 @@ func TestControllerSyncJob(t *testing.T) {
if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
}
// Each create should have an accompanying ControllerRef.
if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) {
t.Errorf("%s: unexpected number of ControllerRefs. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.ControllerRefs))
}
// Make sure the ControllerRefs are correct.
for _, controllerRef := range fakePodControl.ControllerRefs {
if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
}
if got, want := controllerRef.Kind, "Job"; got != want {
t.Errorf("controllerRef.Kind = %q, want %q", got, want)
}
if got, want := controllerRef.Name, job.Name; got != want {
t.Errorf("controllerRef.Name = %q, want %q", got, want)
}
if got, want := controllerRef.UID, job.UID; got != want {
t.Errorf("controllerRef.UID = %q, want %q", got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("controllerRef.Controller is not set to true")
}
}
// validate status
if actual.Status.Active != tc.expectedActive {
t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
@ -578,7 +605,11 @@ func TestJobPodLookup(t *testing.T) {
}
for _, tc := range testCases {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job)
if job := manager.getPodJob(tc.pod); job != nil {
if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 {
if got, want := len(jobs), 1; got != want {
t.Errorf("len(jobs) = %v, want %v", got, want)
}
job := jobs[0]
if tc.expectedName != job.Name {
t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
}
@ -588,6 +619,455 @@ func TestJobPodLookup(t *testing.T) {
}
}
func newPod(name string, job *batch.Job) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: job.Spec.Selector.MatchLabels,
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)},
},
}
}
func TestGetPodsForJob(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job2)
pod3 := newPod("pod3", job1)
// Make pod3 an orphan that doesn't match. It should be ignored.
pod3.OwnerReferences = nil
pod3.Labels = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod3)
pods, err := jm.getPodsForJob(job1)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
if got, want := len(pods), 1; got != want {
t.Errorf("len(pods) = %v, want %v", got, want)
}
if got, want := pods[0].Name, "pod1"; got != want {
t.Errorf("pod.Name = %v, want %v", got, want)
}
pods, err = jm.getPodsForJob(job2)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
if got, want := len(pods), 1; got != want {
t.Errorf("len(pods) = %v, want %v", got, want)
}
if got, want := pods[0].Name, "pod2"; got != want {
t.Errorf("pod.Name = %v, want %v", got, want)
}
}
func TestGetPodsForJobAdopt(t *testing.T) {
job1 := newJob(1, 1)
job1.Name = "job1"
clientset := fake.NewSimpleClientset(job1)
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job1)
// Make this pod an orphan. It should still be returned because it's adopted.
pod2.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
pods, err := jm.getPodsForJob(job1)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
if got, want := len(pods), 2; got != want {
t.Errorf("len(pods) = %v, want %v", got, want)
}
}
func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
job1 := newJob(1, 1)
job1.Name = "job1"
job1.DeletionTimestamp = &metav1.Time{}
clientset := fake.NewSimpleClientset(job1)
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job1)
// Make this pod an orphan. It should not be adopted because the Job is being deleted.
pod2.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
pods, err := jm.getPodsForJob(job1)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
if got, want := len(pods), 1; got != want {
t.Errorf("len(pods) = %v, want %v", got, want)
}
if got, want := pods[0].Name, pod1.Name; got != want {
t.Errorf("pod.Name = %q, want %q", got, want)
}
}
func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
job1 := newJob(1, 1)
job1.Name = "job1"
// The up-to-date object says it's being deleted.
job1.DeletionTimestamp = &metav1.Time{}
clientset := fake.NewSimpleClientset(job1)
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// The cache says it's NOT being deleted.
cachedJob := *job1
cachedJob.DeletionTimestamp = nil
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(&cachedJob)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job1)
// Make this pod an orphan. It should not be adopted because the Job is being deleted.
pod2.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
pods, err := jm.getPodsForJob(job1)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
if got, want := len(pods), 1; got != want {
t.Errorf("len(pods) = %v, want %v", got, want)
}
if got, want := pods[0].Name, pod1.Name; got != want {
t.Errorf("pod.Name = %q, want %q", got, want)
}
}
func TestGetPodsForJobRelease(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job1)
// Make this pod not match, even though it's owned. It should be released.
pod2.Labels = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
pods, err := jm.getPodsForJob(job1)
if err != nil {
t.Fatalf("getPodsForJob() error: %v", err)
}
if got, want := len(pods), 1; got != want {
t.Errorf("len(pods) = %v, want %v", got, want)
}
if got, want := pods[0].Name, "pod1"; got != want {
t.Errorf("pod.Name = %v, want %v", got, want)
}
}
func TestAddPod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job2)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
jm.addPod(pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(job1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
jm.addPod(pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(job2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestAddPodOrphan(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
job3 := newJob(1, 1)
job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
pod1 := newPod("pod1", job1)
// Make pod an orphan. Expect all matching controllers to be queued.
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
jm.addPod(pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job2)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
prev := *pod1
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(job1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
prev = *pod2
bumpResourceVersion(pod2)
jm.updatePod(&prev, pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(job2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
// Labels changed on orphan. Expect newly matching controllers to queue.
prev := *pod1
prev.Labels = map[string]string{"foo2": "bar2"}
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePodChangeControllerRef(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
// Changed ControllerRef. Expect both old and new to queue.
prev := *pod1
prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(job2)}
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePodRelease(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
// Remove ControllerRef. Expect all matching to queue for adoption.
prev := *pod1
pod1.OwnerReferences = nil
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestDeletePod(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
pod1 := newPod("pod1", job1)
pod2 := newPod("pod2", job2)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
jm.deletePod(pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(job1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
jm.deletePod(pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = jm.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(job2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestDeletePodOrphan(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1.Name = "job1"
job2 := newJob(1, 1)
job2.Name = "job2"
job3 := newJob(1, 1)
job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
pod1 := newPod("pod1", job1)
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
jm.deletePod(pod1)
if got, want := jm.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
type FakeJobExpectations struct {
*controller.ControllerExpectations
satisfied bool
@ -723,3 +1203,8 @@ func TestWatchPods(t *testing.T) {
t.Log("Waiting for pod to reach syncHandler")
<-received
}
func bumpResourceVersion(obj metav1.Object) {
ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package job
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/v1"
batch "k8s.io/kubernetes/pkg/apis/batch/v1"
)
@ -29,3 +30,16 @@ func IsJobFinished(j *batch.Job) bool {
}
return false
}
func newControllerRef(j *batch.Job) *metav1.OwnerReference {
blockOwnerDeletion := true
isController := true
return &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: j.Name,
UID: j.UID,
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
}
}

View File

@ -26,6 +26,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library",
],
@ -44,6 +45,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
],
)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/kubernetes/pkg/api"
@ -43,6 +44,12 @@ type jobStrategy struct {
// Strategy is the default logic that applies when creating and updating Replication Controller objects.
var Strategy = jobStrategy{api.Scheme, names.SimpleNameGenerator}
// DefaultGarbageCollectionPolicy returns Orphan because that was the default
// behavior before the server-side garbage collection was implemented.
func (jobStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy {
return rest.OrphanDependents
}
// NamespaceScoped returns true because all jobs need to be within a namespace.
func (jobStrategy) NamespaceScoped() bool {
return true

View File

@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing"
@ -100,6 +101,13 @@ func TestJobStrategy(t *testing.T) {
if len(errs) == 0 {
t.Errorf("Expected a validation error")
}
// Make sure we correctly implement the interface.
// Otherwise a typo could silently change the default.
var gcds rest.GarbageCollectionDeleteStrategy = Strategy
if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want {
t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want)
}
}
func TestJobStrategyWithGeneration(t *testing.T) {

View File

@ -152,7 +152,7 @@ func init() {
Rules: []rbac.PolicyRule{
rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("jobs").RuleOrDie(),
rbac.NewRule("update").Groups(batchGroup).Resources("jobs/status").RuleOrDie(),
rbac.NewRule("list", "watch", "create", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbac.NewRule("list", "watch", "create", "delete", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
eventsRule(),
},
})

View File

@ -493,6 +493,7 @@ items:
- create
- delete
- list
- patch
- watch
- apiGroups:
- ""

View File

@ -28,13 +28,17 @@ import (
// JobListerExpansion allows custom methods to be added to
// JobLister.
type JobListerExpansion interface {
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error)
}
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
func (l *jobLister) GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)

View File

@ -125,7 +125,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
Expect(err).NotTo(HaveOccurred())
Expect(cronJob.Status.Active).Should(HaveLen(1))
By("Ensuring exaclty one running job exists by listing jobs explicitly")
By("Ensuring exactly one running job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs, _ := filterActiveJobs(jobs)
@ -157,7 +157,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
Expect(err).NotTo(HaveOccurred())
Expect(cronJob.Status.Active).Should(HaveLen(1))
By("Ensuring exaclty one running job exists by listing jobs explicitly")
By("Ensuring exactly one running job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs, _ := filterActiveJobs(jobs)
@ -445,13 +445,15 @@ func waitForJobReplaced(c clientset.Interface, ns, previousJobName string) error
if err != nil {
return false, err
}
if len(jobs.Items) > 1 {
// Ignore Jobs pending deletion, since deletion of Jobs is now asynchronous.
aliveJobs := filterNotDeletedJobs(jobs)
if len(aliveJobs) > 1 {
return false, fmt.Errorf("More than one job is running %+v", jobs.Items)
} else if len(jobs.Items) == 0 {
} else if len(aliveJobs) == 0 {
framework.Logf("Warning: Found 0 jobs in namespace %v", ns)
return false, nil
}
return jobs.Items[0].Name != previousJobName, nil
return aliveJobs[0].Name != previousJobName, nil
})
}
@ -502,6 +504,19 @@ func checkNoEventWithReason(c clientset.Interface, ns, cronJobName string, reaso
return nil
}
// filterNotDeletedJobs returns the job list without any jobs that are pending
// deletion.
func filterNotDeletedJobs(jobs *batchv1.JobList) []*batchv1.Job {
var alive []*batchv1.Job
for i := range jobs.Items {
job := &jobs.Items[i]
if job.DeletionTimestamp == nil {
alive = append(alive, job)
}
}
return alive
}
func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) {
for i := range jobs.Items {
j := jobs.Items[i]

View File

@ -48,6 +48,9 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
TypeMeta: metav1.TypeMeta{
Kind: "Job",
},
Spec: batch.JobSpec{
Parallelism: &parallelism,
Completions: &completions,
@ -151,13 +154,18 @@ func DeleteJob(c clientset.Interface, ns, name string) error {
return c.Batch().Jobs(ns).Delete(name, nil)
}
// GetJobPods returns a list of Pods belonging to a Job.
func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
options := metav1.ListOptions{LabelSelector: label.String()}
return c.CoreV1().Pods(ns).List(options)
}
// WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use
// when pods will run for a long time, or it will be racy.
func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error {
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
return wait.Poll(Poll, JobTimeout, func() (bool, error) {
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := c.Core().Pods(ns).List(options)
pods, err := GetJobPods(c, ns, jobName)
if err != nil {
return false, err
}

View File

@ -26,8 +26,11 @@ import (
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/test/e2e/framework"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/controller"
)
var _ = framework.KubeDescribe("Job", func() {
@ -106,4 +109,58 @@ var _ = framework.KubeDescribe("Job", func() {
Expect(err).To(HaveOccurred())
Expect(errors.IsNotFound(err)).To(BeTrue())
})
It("should adopt matching orphans and release non-matching pods", func() {
By("Creating a job")
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions)
// Replace job with the one returned from Create() so it has the UID.
// Save Kind since it won't be populated in the returned job.
kind := job.Kind
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
job.Kind = kind
By("Ensuring active pods == parallelism")
err = framework.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
Expect(err).NotTo(HaveOccurred())
By("Orphaning one of the Job's Pods")
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(int(parallelism)))
pod := pods.Items[0]
f.PodClient().Update(pod.Name, func(pod *v1.Pod) {
pod.OwnerReferences = nil
})
By("Checking that the Job readopts the Pod")
Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", framework.JobTimeout,
func(pod *v1.Pod) (bool, error) {
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
return false, nil
}
if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID {
return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job)
}
return true, nil
},
)).To(Succeed(), "wait for pod %q to be readopted", pod.Name)
By("Removing the labels from the Job's Pod")
f.PodClient().Update(pod.Name, func(pod *v1.Pod) {
pod.Labels = nil
})
By("Checking that the Job releases the Pod")
Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", framework.JobTimeout,
func(pod *v1.Pod) (bool, error) {
controllerRef := controller.GetControllerOf(pod)
if controllerRef != nil {
return false, nil
}
return true, nil
},
)).To(Succeed(), "wait for pod %q to be released", pod.Name)
})
})