diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index a436fa45f43..899c45775dc 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -18,7 +18,9 @@ package e2enode import ( "context" + "errors" "fmt" + "os" "path/filepath" "regexp" "strings" @@ -32,6 +34,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" admissionapi "k8s.io/pod-security-admission/api" @@ -43,6 +47,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" ) var ( @@ -55,6 +60,7 @@ var _ = SIGDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:Devic f := framework.NewDefaultFramework("device-plugin-errors") f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged testDevicePlugin(f, kubeletdevicepluginv1beta1.DevicePluginPath) + testDevicePluginNodeReboot(f, kubeletdevicepluginv1beta1.DevicePluginPath) }) // readDaemonSetV1OrDie reads daemonset object from bytes. Panics on error. @@ -414,6 +420,202 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { }) } +func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { + ginkgo.Context("DevicePlugin [Serial] [Disruptive]", func() { + var devicePluginPod *v1.Pod + var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse + var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse + var triggerPathFile, triggerPathDir string + var err error + ginkgo.BeforeEach(func(ctx context.Context) { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(ctx, func(ctx context.Context) bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + // Before we run the device plugin test, we need to ensure + // that the cluster is in a clean state and there are no + // pods running on this node. + // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. + // xref: https://issue.k8s.io/115381 + gomega.Eventually(ctx, func(ctx context.Context) error { + v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err) + } + + v1PodResources, err = getV1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err) + } + + if len(v1alphaPodResources.PodResources) > 0 { + return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources) + } + + if len(v1PodResources.PodResources) > 0 { + return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) + } + return nil + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) + + ginkgo.By("Setting up the directory and file for controlling registration") + triggerPathDir = filepath.Join(devicePluginDir, "sample") + if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(triggerPathDir, os.ModePerm) + if err != nil { + klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err) + panic(err) + } + klog.InfoS("Directory created successfully") + + triggerPathFile = filepath.Join(triggerPathDir, "registration") + if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) { + _, err = os.Create(triggerPathFile) + if err != nil { + klog.Errorf("File creation %s failed: %v ", triggerPathFile, err) + panic(err) + } + } + } + + ginkgo.By("Scheduling a sample device plugin pod") + data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML) + if err != nil { + framework.Fail(err.Error()) + } + ds := readDaemonSetV1OrDie(data) + + dp := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: SampleDevicePluginName, + }, + Spec: ds.Spec.Template.Spec, + } + + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + go func() { + // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE + // environment variable is specified), device plugin registration needs to be triggerred + // manually. + // This is done by deleting the control file at the following path: + // `/var/lib/kubelet/device-plugins/sample/registration`. + + defer ginkgo.GinkgoRecover() + framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile) + err := os.Remove(triggerPathFile) + framework.ExpectNoError(err) + }() + + ginkgo.By("Waiting for devices to become available on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && CountSampleDeviceCapacity(node) > 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + framework.Logf("Successfully created device plugin pod") + + ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", expectedSampleDevsAmount)) + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && + CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("Deleting the device plugin pod") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) + } + + err = os.Remove(triggerPathDir) + framework.ExpectNoError(err) + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && CountSampleDeviceCapacity(node) <= 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + + ginkgo.By("devices now unavailable on the local node") + }) + + // simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that, + // intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod + // exposing those devices as resource has re-registers itself to Kubelet. The expected behavior is that the application pod fails at + // admission time. + ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + + gomega.Expect(devID1).To(gomega.Not(gomega.Equal(""))) + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + ginkgo.By("stopping all the local containers - using CRI") + rs, _, err := getCRIClient() + framework.ExpectNoError(err) + sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{}) + framework.ExpectNoError(err) + for _, sandbox := range sandboxes { + gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil()) + ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id)) + + err := rs.RemovePodSandbox(ctx, sandbox.Id) + framework.ExpectNoError(err) + } + + ginkgo.By("restarting the kubelet") + startKubelet() + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after node reboot") + + }) + }) +} + // makeBusyboxPod returns a simple Pod spec with a busybox container // that requests SampleDeviceResourceName and runs the specified command. func makeBusyboxPod(SampleDeviceResourceName, cmd string) *v1.Pod {