diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 8e956350882..377d44b4718 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -326,6 +326,14 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta } +// TerminatePod ensures that the status of containers is properly defaulted at the end of the pod +// lifecycle. As the Kubelet must reconcile with the container runtime to observe container status +// there is always the possibility we are unable to retrieve one or more container statuses due to +// garbage collection, admin action, or loss of temporary data on a restart. This method ensures +// that any absent container status is treated as a failure so that we do not incorrectly describe +// the pod as successful. If we have not yet initialized the pod in the presence of init containers, +// the init container failure status is sufficient to describe the pod as failing, and we do not need +// to override waiting containers (unless there is evidence the pod previously started those containers). func (m *manager) TerminatePod(pod *v1.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 469d0060ce9..8b0be23da24 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -25,6 +25,7 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -35,6 +36,7 @@ import ( e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" + "k8s.io/kubernetes/test/e2e/scheduling" "k8s.io/utils/pointer" "github.com/onsi/ginkgo" @@ -45,6 +47,10 @@ var _ = SIGDescribe("Job", func() { f := framework.NewDefaultFramework("job") parallelism := int32(2) completions := int32(4) + + largeParallelism := int32(90) + largeCompletions := int32(90) + backoffLimit := int32(6) // default value // Simplest case: N pods succeed @@ -361,6 +367,52 @@ var _ = SIGDescribe("Job", func() { framework.ExpectEqual(pod.Status.Phase, v1.PodFailed) } }) + + ginkgo.It("should run a job to completion with CPU requests [Serial]", func() { + ginkgo.By("Creating a job that with CPU requests") + + testNodeName := scheduling.GetNodeThatCanRunPod(f) + targetNode, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), testNodeName, metav1.GetOptions{}) + framework.ExpectNoError(err, "unable to get node object for node %v", testNodeName) + + cpu, ok := targetNode.Status.Allocatable[v1.ResourceCPU] + if !ok { + framework.Failf("Unable to get node's %q cpu", targetNode.Name) + } + + cpuRequest := fmt.Sprint(int64(0.2 * float64(cpu.Value()))) + + backoff := 0 + ginkgo.By("Creating a job") + job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, largeParallelism, largeCompletions, nil, int32(backoff)) + for i := range job.Spec.Template.Spec.Containers { + job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpuRequest), + }, + } + job.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": testNodeName} + } + + framework.Logf("Creating job %q with a node hostname selector %q wth cpu request %q", job.Name, testNodeName, cpuRequest) + job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Ensuring job reaches completions") + err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, largeCompletions) + framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) + + ginkgo.By("Ensuring pods for job exist") + pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name) + successes := int32(0) + for _, pod := range pods.Items { + if pod.Status.Phase == v1.PodSucceeded { + successes++ + } + } + framework.ExpectEqual(successes, largeCompletions, "expected %d successful job pods, but got %d", largeCompletions, successes) + }) }) // waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.