diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index d750a70d6fa..4cbf558505f 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -269,7 +269,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { if err != nil { framework.ExpectNoError(err, "getting pod resources assignment after pod restart") } - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") ginkgo.By("Creating another pod") @@ -288,9 +288,9 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { if err != nil { framework.ExpectNoError(err, "getting pod resources assignment after pod restart") } - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod1") - err = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) @@ -308,7 +308,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) - framework.Logf("testing pod: UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) + framework.Logf("testing pod: pre-restart UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) ginkgo.By("Restarting Kubelet") restartKubelet(true) @@ -331,9 +331,9 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { Should(BeTheSamePodStillRunning(pod1), "the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts") - pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + pod2, err := e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) - framework.Logf("testing pod: UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) + framework.Logf("testing pod: post-restart UID=%s namespace=%s name=%s ready=%v", pod2.UID, pod2.Namespace, pod2.Name, podutils.IsPodReady(pod2)) // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. // note we don't check again the logs of the container: the check is done at startup, the container @@ -345,7 +345,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) @@ -408,7 +408,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) @@ -462,7 +462,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) @@ -504,7 +504,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") ginkgo.By("Re-Register resources by deleting the plugin pod") @@ -533,6 +533,62 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { Should(BeTheSamePodStillRunning(pod1), "the same pod instance not running across kubelet restarts, workload should not be perturbed by device plugins restarts") }) + + ginkgo.It("[OrphanedPods] Ensures pods consuming devices deleted while kubelet is down are cleaned up correctly", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) + pod := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID, err := parseLog(ctx, f, pod.Name, pod.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod.Name) + gomega.Expect(devID).To(gomega.Not(gomega.BeEmpty()), "pod1 requested a device but started successfully without") + + pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + // wait until the kubelet health check will fail + gomega.Eventually(ctx, func() bool { + ok := kubeletHealthCheck(kubeletHealthCheckURL) + framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok) + return ok + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse()) + + framework.Logf("Delete the pod while the kubelet is not running") + // Delete pod sync by name will force delete the pod, removing it from kubelet's config + deletePodSyncByName(ctx, f, pod.Name) + + framework.Logf("Starting the kubelet") + startKubelet() + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func() bool { + ok := kubeletHealthCheck(kubeletHealthCheckURL) + framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok) + return ok + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue()) + + framework.Logf("wait for the pod %v to disappear", pod.Name) + gomega.Eventually(ctx, func(ctx context.Context) error { + err := checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace) + framework.Logf("pod %s/%s disappear check err=%v", pod.Namespace, pod.Name, err) + return err + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil()) + + waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace) + + ginkgo.By("Verifying the device assignment after device plugin restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + err, allocated := checkPodResourcesAssignment(v1PodResources, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, SampleDeviceResourceName, []string{}) + if err == nil || allocated { + framework.Fail(fmt.Sprintf("stale device assignment after pod deletion while kubelet was down allocated=%v error=%v", allocated, err)) + } + }) }) } @@ -725,7 +781,7 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after node reboot") }) @@ -793,7 +849,7 @@ func parseLog(ctx context.Context, f *framework.Framework, podName string, contN return matches[1], nil } -func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResourcesResponse, podNamespace, podName, containerName, resourceName string, devs []string) error { +func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResourcesResponse, podNamespace, podName, containerName, resourceName string, devs []string) (error, bool) { for _, podRes := range v1PodRes.PodResources { if podRes.Namespace != podNamespace || podRes.Name != podName { continue @@ -805,10 +861,12 @@ func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResource return matchContainerDevices(podNamespace+"/"+podName+"/"+containerName, contRes.Devices, resourceName, devs) } } - return fmt.Errorf("no resources found for %s/%s/%s", podNamespace, podName, containerName) + err := fmt.Errorf("no resources found for %s/%s/%s", podNamespace, podName, containerName) + framework.Logf("%v", err) + return err, false } -func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.ContainerDevices, resourceName string, devs []string) error { +func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.ContainerDevices, resourceName string, devs []string) (error, bool) { expected := sets.New[string](devs...) assigned := sets.New[string]() for _, contDev := range contDevs { @@ -821,9 +879,9 @@ func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.Conta assignedStr := strings.Join(assigned.UnsortedList(), ",") framework.Logf("%s: devices expected %q assigned %q", ident, expectedStr, assignedStr) if !assigned.Equal(expected) { - return fmt.Errorf("device allocation mismatch for %s expected %s assigned %s", ident, expectedStr, assignedStr) + return fmt.Errorf("device allocation mismatch for %s expected %s assigned %s", ident, expectedStr, assignedStr), true } - return nil + return nil, true } // getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests.