diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index 2892dae1175..ad65702b569 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -24,10 +24,14 @@ import ( "regexp" "sort" "strings" + "sync" "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -39,6 +43,8 @@ import ( e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" + testutils "k8s.io/kubernetes/test/utils" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -266,6 +272,131 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur }) }) + + ginkgo.Context("With sample device plugin", func(ctx context.Context) { + var deviceCount int = 2 + var devicePluginPod *v1.Pod + var testPods []*v1.Pod + + ginkgo.BeforeEach(func() { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + ginkgo.By("Scheduling a sample device plugin pod") + data, err := e2etestfiles.Read(SampleDevicePluginDS2YAML) + if err != nil { + framework.Fail(err.Error()) + } + ds := readDaemonSetV1OrDie(data) + + dp := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: sampleDevicePluginName, + }, + Spec: ds.Spec.Template.Spec, + } + + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + ginkgo.By("Waiting for devices to become available on the local node") + gomega.Eventually(func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && numberOfSampleResources(node) > 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + framework.Logf("Successfully created device plugin pod") + + devsLen := int64(deviceCount) // shortcut + ginkgo.By("Waiting for the resource exported by the sample device plugin to become available on the local node") + gomega.Eventually(func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + numberOfDevicesCapacity(node, resourceName) == devsLen && + numberOfDevicesAllocatable(node, resourceName) == devsLen + }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + }) + + ginkgo.AfterEach(func() { + ginkgo.By("Deleting the device plugin pod") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) + } + + restartKubelet(true) + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && numberOfSampleResources(node) <= 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + }) + + ginkgo.It("should recover device plugin pod first, then pod consuming devices", func() { + var err error + podCMD := "cat /tmp/Dev-* && sleep inf" + + ginkgo.By(fmt.Sprintf("creating %d pods requiring %q", deviceCount, resourceName)) + var devReqPods []*v1.Pod + for idx := 0; idx < deviceCount; idx++ { + pod := makeBusyboxDeviceRequiringPod(resourceName, podCMD) + devReqPods = append(devReqPods, pod) + } + testPods = e2epod.NewPodClient(f).CreateBatch(ctx, devReqPods) + + ginkgo.By("making sure all the pods are ready") + waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + ginkgo.By("stopping all the local containers - using CRI") + rs, _, err := getCRIClient() + framework.ExpectNoError(err) + sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{}) + framework.ExpectNoError(err) + for _, sandbox := range sandboxes { + gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil()) + ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id)) + + err := rs.RemovePodSandbox(ctx, sandbox.Id) + framework.ExpectNoError(err) + } + + ginkgo.By("restarting the kubelet") + startKubelet() + + ginkgo.By("waiting for the kubelet to be ready again") + // Wait for the Kubelet to be ready. + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + // TODO: here we need to tolerate pods waiting to be recreated + + ginkgo.By("making sure all the pods are ready after the recovery") + waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady) + + ginkgo.By("removing all the pods") + deleteBatch(ctx, f, testPods) + }) + + }) + }) func compareSRIOVResources(expected, got *sriovData) { @@ -351,3 +482,59 @@ func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) s sort.Strings(entries) return strings.Join(entries, ", ") } + +func waitForPodConditionBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod, conditionDesc string, timeout time.Duration, condition func(pod *v1.Pod) (bool, error)) { + var wg sync.WaitGroup + for _, pod := range pods { + wg.Add(1) + go func(podNS, podName string) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, podNS, podName, conditionDesc, timeout, condition) + framework.ExpectNoError(err, "pod %s/%s did not go running", podNS, podName) + framework.Logf("pod %s/%s running", podNS, podName) + }(pod.Namespace, pod.Name) + } + wg.Wait() +} + +func deleteBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod) { + var wg sync.WaitGroup + for _, pod := range pods { + wg.Add(1) + go func(podNS, podName string) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + + deletePodSyncByName(ctx, f, podName) + waitForAllContainerRemoval(ctx, podName, podNS) + }(pod.Namespace, pod.Name) + } + wg.Wait() +} + +func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod { + podName := "device-manager-test-" + string(uuid.NewUUID()) + rl := v1.ResourceList{ + v1.ResourceName(resourceName): *resource.NewQuantity(1, resource.DecimalSI), + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{{ + Image: busyboxImage, + Name: podName, + // Runs the specified command in the test pod. + Command: []string{"sh", "-c", cmd}, + Resources: v1.ResourceRequirements{ + Limits: rl, + Requests: rl, + }, + }}, + }, + } +}