Merge pull request #76401 from chardch/gpu-job-with-recreate

E2E test for GPU job interrupted by node recreate
This commit is contained in:
Kubernetes Prow Robot 2019-05-17 13:04:12 -07:00 committed by GitHub
commit a4fc418c84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 139 additions and 69 deletions

View File

@ -22,7 +22,7 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -248,6 +248,23 @@ func (c *PodClient) WaitForFailure(name string, timeout time.Duration) {
)).To(gomega.Succeed(), "wait for pod %q to fail", name) )).To(gomega.Succeed(), "wait for pod %q to fail", name)
} }
// WaitForFinish waits for pod to finish running, regardless of success or failure.
func (c *PodClient) WaitForFinish(name string, timeout time.Duration) {
f := c.f
gomega.Expect(WaitForPodCondition(f.ClientSet, f.Namespace.Name, name, "success or failure", timeout,
func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodFailed:
return true, nil
case v1.PodSucceeded:
return true, nil
default:
return false, nil
}
},
)).To(gomega.Succeed(), "wait for pod %q to finish running", name)
}
// WaitForErrorEventOrSuccess waits for pod to succeed or an error event for that pod. // WaitForErrorEventOrSuccess waits for pod to succeed or an error event for that pod.
func (c *PodClient) WaitForErrorEventOrSuccess(pod *v1.Pod) (*v1.Event, error) { func (c *PodClient) WaitForErrorEventOrSuccess(pod *v1.Pod) (*v1.Event, error) {
var ev *v1.Event var ev *v1.Event

View File

@ -92,12 +92,12 @@ var _ = ginkgo.Describe("Recreate [Feature:Recreate]", func() {
// Recreate all the nodes in the test instance group // Recreate all the nodes in the test instance group
func testRecreate(c clientset.Interface, ps *testutils.PodStore, systemNamespace string, nodes []v1.Node, podNames []string) { func testRecreate(c clientset.Interface, ps *testutils.PodStore, systemNamespace string, nodes []v1.Node, podNames []string) {
err := recreateNodes(c, nodes) err := RecreateNodes(c, nodes)
if err != nil { if err != nil {
framework.Failf("Test failed; failed to start the restart instance group command.") framework.Failf("Test failed; failed to start the restart instance group command.")
} }
err = waitForNodeBootIdsToChange(c, nodes, framework.RecreateNodeReadyAgainTimeout) err = WaitForNodeBootIdsToChange(c, nodes, framework.RecreateNodeReadyAgainTimeout)
if err != nil { if err != nil {
framework.Failf("Test failed; failed to recreate at least one node in %v.", framework.RecreateNodeReadyAgainTimeout) framework.Failf("Test failed; failed to recreate at least one node in %v.", framework.RecreateNodeReadyAgainTimeout)
} }

View File

@ -28,7 +28,8 @@ import (
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
) )
func recreateNodes(c clientset.Interface, nodes []v1.Node) error { // RecreateNodes recreates the given nodes in a managed instance group.
func RecreateNodes(c clientset.Interface, nodes []v1.Node) error {
// Build mapping from zone to nodes in that zone. // Build mapping from zone to nodes in that zone.
nodeNamesByZone := make(map[string][]string) nodeNamesByZone := make(map[string][]string)
for i := range nodes { for i := range nodes {
@ -63,13 +64,14 @@ func recreateNodes(c clientset.Interface, nodes []v1.Node) error {
framework.Logf("Recreating instance group %s.", instanceGroup) framework.Logf("Recreating instance group %s.", instanceGroup)
stdout, stderr, err := framework.RunCmd("gcloud", args...) stdout, stderr, err := framework.RunCmd("gcloud", args...)
if err != nil { if err != nil {
return fmt.Errorf("error restarting nodes: %s\nstdout: %s\nstderr: %s", err, stdout, stderr) return fmt.Errorf("error recreating nodes: %s\nstdout: %s\nstderr: %s", err, stdout, stderr)
} }
} }
return nil return nil
} }
func waitForNodeBootIdsToChange(c clientset.Interface, nodes []v1.Node, timeout time.Duration) error { // WaitForNodeBootIdsToChange waits for the boot ids of the given nodes to change in order to verify the node has been recreated.
func WaitForNodeBootIdsToChange(c clientset.Interface, nodes []v1.Node, timeout time.Duration) error {
errMsg := []string{} errMsg := []string{}
for i := range nodes { for i := range nodes {
node := &nodes[i] node := &nodes[i]

View File

@ -45,6 +45,7 @@ go_library(
"//test/e2e/common:go_default_library", "//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library", "//test/e2e/framework:go_default_library",
"//test/e2e/framework/gpu:go_default_library", "//test/e2e/framework/gpu:go_default_library",
"//test/e2e/framework/job:go_default_library",
"//test/e2e/framework/log:go_default_library", "//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/providers/gce:go_default_library", "//test/e2e/framework/providers/gce:go_default_library",
"//test/e2e/framework/replicaset:go_default_library", "//test/e2e/framework/replicaset:go_default_library",

View File

@ -18,6 +18,7 @@ package scheduling
import ( import (
"os" "os"
"regexp"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -27,7 +28,9 @@ import (
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/gpu" "k8s.io/kubernetes/test/e2e/framework/gpu"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
e2elog "k8s.io/kubernetes/test/e2e/framework/log" e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework/providers/gce"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
@ -131,7 +134,7 @@ func SetupNVIDIAGPUNode(f *framework.Framework, setupResourceGatherer bool) *fra
e2elog.Logf("Using %v", dsYamlURL) e2elog.Logf("Using %v", dsYamlURL)
// Creates the DaemonSet that installs Nvidia Drivers. // Creates the DaemonSet that installs Nvidia Drivers.
ds, err := framework.DsFromManifest(dsYamlURL) ds, err := framework.DsFromManifest(dsYamlURL)
gomega.Expect(err).NotTo(gomega.HaveOccurred()) framework.ExpectNoError(err)
ds.Namespace = f.Namespace.Name ds.Namespace = f.Namespace.Name
_, err = f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Create(ds) _, err = f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Create(ds)
framework.ExpectNoError(err, "failed to create nvidia-driver-installer daemonset") framework.ExpectNoError(err, "failed to create nvidia-driver-installer daemonset")
@ -172,8 +175,8 @@ func testNvidiaGPUs(f *framework.Framework) {
} }
e2elog.Logf("Wait for all test pods to succeed") e2elog.Logf("Wait for all test pods to succeed")
// Wait for all pods to succeed // Wait for all pods to succeed
for _, po := range podList { for _, pod := range podList {
f.PodClient().WaitForSuccess(po.Name, 5*time.Minute) f.PodClient().WaitForSuccess(pod.Name, 5*time.Minute)
} }
e2elog.Logf("Stopping ResourceUsageGather") e2elog.Logf("Stopping ResourceUsageGather")
@ -190,3 +193,104 @@ var _ = SIGDescribe("[Feature:GPUDevicePlugin]", func() {
testNvidiaGPUs(f) testNvidiaGPUs(f)
}) })
}) })
func testNvidiaGPUsJob(f *framework.Framework) {
_ = SetupNVIDIAGPUNode(f, false)
// Job set to have 5 completions with parallelism of 1 to ensure that it lasts long enough to experience the node recreation
completions := int32(5)
ginkgo.By("Starting GPU job")
StartJob(f, completions)
job, err := jobutil.GetJob(f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
// make sure job is running by waiting for its first pod to start running
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1)
framework.ExpectNoError(err)
numNodes, err := framework.NumberOfRegisteredNodes(f.ClientSet)
framework.ExpectNoError(err)
nodes, err := framework.CheckNodesReady(f.ClientSet, numNodes, framework.NodeReadyInitialTimeout)
framework.ExpectNoError(err)
ginkgo.By("Recreating nodes")
err = gce.RecreateNodes(f.ClientSet, nodes)
framework.ExpectNoError(err)
ginkgo.By("Done recreating nodes")
ginkgo.By("Waiting for gpu job to finish")
err = jobutil.WaitForJobFinish(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err)
ginkgo.By("Done with gpu job")
gomega.Expect(job.Status.Failed).To(gomega.BeZero(), "Job pods failed during node recreation: %v", job.Status.Failed)
VerifyJobNCompletions(f, completions)
}
// StartJob starts a simple CUDA job that requests gpu and the specified number of completions
func StartJob(f *framework.Framework, completions int32) {
var activeSeconds int64 = 3600
testJob := jobutil.NewTestJob("succeed", "cuda-add", v1.RestartPolicyAlways, 1, completions, &activeSeconds, 6)
testJob.Spec.Template.Spec = v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
{
Name: "vector-addition",
Image: imageutils.GetE2EImage(imageutils.CudaVectorAdd),
Command: []string{"/bin/sh", "-c", "./vectorAdd && sleep 60"},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
gpuResourceName: *resource.NewQuantity(1, resource.DecimalSI),
},
},
},
},
}
ns := f.Namespace.Name
_, err := jobutil.CreateJob(f.ClientSet, ns, testJob)
framework.ExpectNoError(err)
framework.Logf("Created job %v", testJob)
}
// VerifyJobNCompletions verifies that the job has completions number of successful pods
func VerifyJobNCompletions(f *framework.Framework, completions int32) {
ns := f.Namespace.Name
pods, err := jobutil.GetJobPods(f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
createdPods := pods.Items
createdPodNames := podNames(createdPods)
framework.Logf("Got the following pods for job cuda-add: %v", createdPodNames)
successes := int32(0)
for _, podName := range createdPodNames {
f.PodClient().WaitForFinish(podName, 5*time.Minute)
logs, err := framework.GetPodLogs(f.ClientSet, ns, podName, "vector-addition")
framework.ExpectNoError(err, "Should be able to get logs for pod %v", podName)
regex := regexp.MustCompile("PASSED")
if regex.MatchString(logs) {
successes++
}
}
if successes != completions {
framework.Failf("Only got %v completions. Expected %v completions.", successes, completions)
}
}
func podNames(pods []v1.Pod) []string {
originalPodNames := make([]string, len(pods))
for i, p := range pods {
originalPodNames[i] = p.ObjectMeta.Name
}
return originalPodNames
}
var _ = SIGDescribe("GPUDevicePluginAcrossRecreate [Feature:Recreate]", func() {
ginkgo.BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke")
})
f := framework.NewDefaultFramework("device-plugin-gpus-recreate")
ginkgo.It("run Nvidia GPU Device Plugin tests with a recreation", func() {
testNvidiaGPUsJob(f)
})
})

View File

@ -28,7 +28,6 @@ go_library(
"//staging/src/k8s.io/api/autoscaling/v1:go_default_library", "//staging/src/k8s.io/api/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
@ -37,7 +36,6 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//test/e2e/common:go_default_library", "//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library", "//test/e2e/framework:go_default_library",
"//test/e2e/framework/gpu:go_default_library",
"//test/e2e/framework/job:go_default_library", "//test/e2e/framework/job:go_default_library",
"//test/e2e/framework/log:go_default_library", "//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/testfiles:go_default_library", "//test/e2e/framework/testfiles:go_default_library",

View File

@ -17,22 +17,18 @@ limitations under the License.
package upgrades package upgrades
import ( import (
"regexp"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/gpu"
jobutil "k8s.io/kubernetes/test/e2e/framework/job" jobutil "k8s.io/kubernetes/test/e2e/framework/job"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/scheduling" "k8s.io/kubernetes/test/e2e/scheduling"
imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
"github.com/onsi/gomega" "github.com/onsi/gomega"
) )
const (
completions = int32(1)
)
// NvidiaGPUUpgradeTest tests that gpu resource is available before and after // NvidiaGPUUpgradeTest tests that gpu resource is available before and after
// a cluster upgrade. // a cluster upgrade.
type NvidiaGPUUpgradeTest struct { type NvidiaGPUUpgradeTest struct {
@ -45,7 +41,7 @@ func (NvidiaGPUUpgradeTest) Name() string { return "nvidia-gpu-upgrade [sig-node
func (t *NvidiaGPUUpgradeTest) Setup(f *framework.Framework) { func (t *NvidiaGPUUpgradeTest) Setup(f *framework.Framework) {
scheduling.SetupNVIDIAGPUNode(f, false) scheduling.SetupNVIDIAGPUNode(f, false)
ginkgo.By("Creating a job requesting gpu") ginkgo.By("Creating a job requesting gpu")
t.startJob(f) scheduling.StartJob(f, completions)
} }
// Test waits for the upgrade to complete, and then verifies that the // Test waits for the upgrade to complete, and then verifies that the
@ -53,7 +49,7 @@ func (t *NvidiaGPUUpgradeTest) Setup(f *framework.Framework) {
func (t *NvidiaGPUUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) { func (t *NvidiaGPUUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) {
<-done <-done
ginkgo.By("Verifying gpu job success") ginkgo.By("Verifying gpu job success")
t.verifyJobPodSuccess(f) scheduling.VerifyJobNCompletions(f, completions)
if upgrade == MasterUpgrade || upgrade == ClusterUpgrade { if upgrade == MasterUpgrade || upgrade == ClusterUpgrade {
// MasterUpgrade should be totally hitless. // MasterUpgrade should be totally hitless.
job, err := jobutil.GetJob(f.ClientSet, f.Namespace.Name, "cuda-add") job, err := jobutil.GetJob(f.ClientSet, f.Namespace.Name, "cuda-add")
@ -66,51 +62,3 @@ func (t *NvidiaGPUUpgradeTest) Test(f *framework.Framework, done <-chan struct{}
func (t *NvidiaGPUUpgradeTest) Teardown(f *framework.Framework) { func (t *NvidiaGPUUpgradeTest) Teardown(f *framework.Framework) {
// rely on the namespace deletion to clean up everything // rely on the namespace deletion to clean up everything
} }
// startJob creates a job that requests gpu and runs a simple cuda container.
func (t *NvidiaGPUUpgradeTest) startJob(f *framework.Framework) {
var activeSeconds int64 = 3600
// Specifies 100 completions to make sure the job life spans across the upgrade.
testJob := jobutil.NewTestJob("succeed", "cuda-add", v1.RestartPolicyAlways, 1, 100, &activeSeconds, 6)
testJob.Spec.Template.Spec = v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
{
Name: "vector-addition",
Image: imageutils.GetE2EImage(imageutils.CudaVectorAdd),
Command: []string{"/bin/sh", "-c", "./vectorAdd && sleep 60"},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
gpu.NVIDIAGPUResourceName: *resource.NewQuantity(1, resource.DecimalSI),
},
},
},
},
}
ns := f.Namespace.Name
_, err := jobutil.CreateJob(f.ClientSet, ns, testJob)
framework.ExpectNoError(err)
e2elog.Logf("Created job %v", testJob)
ginkgo.By("Waiting for gpu job pod start")
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, ns, testJob.Name, 1)
framework.ExpectNoError(err)
ginkgo.By("Done with gpu job pod start")
}
// verifyJobPodSuccess verifies that the started cuda pod successfully passes.
func (t *NvidiaGPUUpgradeTest) verifyJobPodSuccess(f *framework.Framework) {
// Wait for client pod to complete.
ns := f.Namespace.Name
err := jobutil.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, "cuda-add", 1)
framework.ExpectNoError(err)
pods, err := jobutil.GetJobPods(f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
createdPod := pods.Items[0].Name
e2elog.Logf("Created pod %v", createdPod)
f.PodClient().WaitForSuccess(createdPod, 5*time.Minute)
logs, err := framework.GetPodLogs(f.ClientSet, ns, createdPod, "vector-addition")
framework.ExpectNoError(err, "Should be able to get pod logs")
e2elog.Logf("Got pod logs: %v", logs)
regex := regexp.MustCompile("PASSED")
gomega.Expect(regex.MatchString(logs)).To(gomega.BeTrue())
}