diff --git a/test/e2e/framework/jobs_util.go b/test/e2e/framework/jobs_util.go index 0862e615284..68f4b8b4068 100644 --- a/test/e2e/framework/jobs_util.go +++ b/test/e2e/framework/jobs_util.go @@ -46,6 +46,9 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl ObjectMeta: metav1.ObjectMeta{ Name: name, }, + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + }, Spec: batch.JobSpec{ Parallelism: ¶llelism, Completions: &completions, @@ -126,13 +129,18 @@ func DeleteJob(c clientset.Interface, ns, name string) error { return c.Batch().Jobs(ns).Delete(name, nil) } +// GetJobPods returns a list of Pods belonging to a Job. +func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) { + label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName})) + options := metav1.ListOptions{LabelSelector: label.String()} + return c.CoreV1().Pods(ns).List(options) +} + // WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use // when pods will run for a long time, or it will be racy. func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error { - label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName})) return wait.Poll(Poll, JobTimeout, func() (bool, error) { - options := metav1.ListOptions{LabelSelector: label.String()} - pods, err := c.Core().Pods(ns).List(options) + pods, err := GetJobPods(c, ns, jobName) if err != nil { return false, err } diff --git a/test/e2e/job.go b/test/e2e/job.go index 12bf5f09f60..08212bf42d4 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -26,8 +26,11 @@ import ( "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/test/e2e/framework" + "fmt" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/controller" ) var _ = framework.KubeDescribe("Job", func() { @@ -106,4 +109,58 @@ var _ = framework.KubeDescribe("Job", func() { Expect(err).To(HaveOccurred()) Expect(errors.IsNotFound(err)).To(BeTrue()) }) + + It("should adopt matching orphans and release non-matching pods", func() { + By("Creating a job") + job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions) + // Replace job with the one returned from Create() so it has the UID. + // Save Kind since it won't be populated in the returned job. + kind := job.Kind + job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + job.Kind = kind + + By("Ensuring active pods == parallelism") + err = framework.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + + By("Orphaning one of the Job's Pods") + pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(int(parallelism))) + pod := pods.Items[0] + f.PodClient().Update(pod.Name, func(pod *v1.Pod) { + pod.OwnerReferences = nil + }) + + By("Checking that the Job readopts the Pod") + Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", framework.JobTimeout, + func(pod *v1.Pod) (bool, error) { + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + return false, nil + } + if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID { + return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job) + } + return true, nil + }, + )).To(Succeed(), "wait for pod %q to be readopted", pod.Name) + + By("Removing the labels from the Job's Pod") + f.PodClient().Update(pod.Name, func(pod *v1.Pod) { + pod.Labels = nil + }) + + By("Checking that the Job releases the Pod") + Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", framework.JobTimeout, + func(pod *v1.Pod) (bool, error) { + controllerRef := controller.GetControllerOf(pod) + if controllerRef != nil { + return false, nil + } + return true, nil + }, + )).To(Succeed(), "wait for pod %q to be released", pod.Name) + }) })