Resurrect GPU tests that use Jobs

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas 2024-09-19 10:11:01 -04:00
parent cdfabdc86e
commit 630abc6929
No known key found for this signature in database
GPG Key ID: 80D83A796103BF59

View File

@ -18,7 +18,9 @@ package node
import (
"context"
e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
"os"
"regexp"
"time"
appsv1 "k8s.io/api/apps/v1"
@ -89,6 +91,36 @@ var _ = SIGDescribe(feature.GPUDevicePlugin, "Sanity test for Nvidia Device", fu
gomega.Expect(log).To(gomega.ContainSubstring("Matrix multiplication result:"))
gomega.Expect(log).To(gomega.ContainSubstring("Time taken for 5000x5000 matrix multiplication"))
})
f.It("should run gpu based jobs", func(ctx context.Context) {
SetupEnvironmentAndSkipIfNeeded(ctx, f, f.ClientSet)
// Job set to have 5 completions with parallelism of 1 to ensure that it lasts long enough to experience the node recreation
completions := int32(5)
ginkgo.By("Starting GPU job")
StartJob(ctx, f, completions)
job, err := e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
// make sure job is running by waiting for its first pod to start running
err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1)
framework.ExpectNoError(err)
numNodes, err := e2enode.TotalRegistered(ctx, f.ClientSet)
framework.ExpectNoError(err)
_, err = e2enode.CheckReady(ctx, f.ClientSet, numNodes, framework.NodeReadyInitialTimeout)
framework.ExpectNoError(err)
ginkgo.By("Waiting for gpu job to finish")
err = e2ejob.WaitForJobFinish(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err)
ginkgo.By("Done with gpu job")
gomega.Expect(job.Status.Failed).To(gomega.BeZero(), "Job pods failed during node recreation: %v", job.Status.Failed)
VerifyJobNCompletions(ctx, f, completions)
})
})
func createAndValidatePod(ctx context.Context, f *framework.Framework, podClient *e2epod.PodClient, pod *v1.Pod) {
@ -304,3 +336,109 @@ func SetupNVIDIAGPUNode(ctx context.Context, f *framework.Framework) *e2edebug.C
return rsgather
}
// StartJob starts a simple CUDA job that requests gpu and the specified number of completions
func StartJob(ctx context.Context, f *framework.Framework, completions int32) {
var activeSeconds int64 = 3600
testJob := e2ejob.NewTestJob("succeed", "cuda-add", v1.RestartPolicyAlways, 1, completions, &activeSeconds, 6)
testJob.Spec.Template.Spec = v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
{
Name: "vector-addition",
Image: "cupy/cupy:v13.3.0",
Command: []string{
"python3",
"-c",
`
import cupy as cp
import numpy as np
import time
# Set the number of elements to test
num_elements_list = [10, 100, 1000, 10000, 100000, 1000000]
for num_elements in num_elements_list:
# Create random input vectors on the CPU
h_A = np.random.rand(num_elements).astype(np.float32)
h_B = np.random.rand(num_elements).astype(np.float32)
# Transfer the input vectors to the GPU
d_A = cp.asarray(h_A)
d_B = cp.asarray(h_B)
# Perform vector addition on the GPU
start_gpu = time.time()
d_C = d_A + d_B
gpu_time = time.time() - start_gpu
# Transfer the result back to the CPU
h_C = cp.asnumpy(d_C)
# Compute the expected result on the CPU
start_cpu = time.time()
h_C_expected = h_A + h_B
cpu_time = time.time() - start_cpu
# Verify the result
if np.allclose(h_C_expected, h_C, atol=1e-5):
print(f"GPU time: {gpu_time:.6f} seconds")
print(f"CPU time: {cpu_time:.6f} seconds")
print(f"GPU speedup: {cpu_time / gpu_time:.2f}x")
else:
print(f"Test FAILED for {num_elements} elements.")
# Print the first few elements for verification
print("First few elements of A:", h_A[:5])
print("First few elements of B:", h_B[:5])
print("First few elements of C:", h_C[:5])
print(f"Test PASSED")
`,
},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
e2egpu.NVIDIAGPUResourceName: *resource.NewQuantity(1, resource.DecimalSI),
},
},
},
},
}
ns := f.Namespace.Name
_, err := e2ejob.CreateJob(ctx, f.ClientSet, ns, testJob)
framework.ExpectNoError(err)
framework.Logf("Created job %v", testJob)
}
func podNames(pods []v1.Pod) []string {
originalPodNames := make([]string, len(pods))
for i, p := range pods {
originalPodNames[i] = p.ObjectMeta.Name
}
return originalPodNames
}
// VerifyJobNCompletions verifies that the job has completions number of successful pods
func VerifyJobNCompletions(ctx context.Context, f *framework.Framework, completions int32) {
ns := f.Namespace.Name
pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
createdPods := pods.Items
createdPodNames := podNames(createdPods)
framework.Logf("Got the following pods for job cuda-add: %v", createdPodNames)
successes := int32(0)
regex := regexp.MustCompile("PASSED")
for _, podName := range createdPodNames {
e2epod.NewPodClient(f).WaitForFinish(ctx, podName, 5*time.Minute)
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, ns, podName, "vector-addition")
framework.ExpectNoError(err, "Should be able to get logs for pod %v", podName)
if regex.MatchString(logs) {
successes++
}
gomega.Expect(logs).To(gomega.Not(gomega.ContainSubstring("FAILED")))
}
if successes != completions {
framework.Failf("Only got %v completions. Expected %v completions.", successes, completions)
}
}