From dc1a5926321410f141f48b9edd3397fed2d0f93e Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Wed, 21 Dec 2022 12:55:47 +0000 Subject: [PATCH 1/7] node: device-mgr: Handle recovery by checking if healthy devices exist In case of node reboot/kubelet restart, the flow of events involves obtaining the state from the checkpoint file followed by setting the `healthDevices`/`unhealthyDevices` to its zero value. This is done to allow the device plugin to re-register itself so that capacity can be updated appropriately. During the allocation phase, we need to check if the resources requested by the pod have been registered AND healthy devices are present on the node to be allocated. Also we need to move this check above `needed==0` where needed is required - devices allocated to the container (which is obtained from the checkpoint file) because even in cases where no additional devices have to be allocated (as they were pre-allocated), we still need to make sure he devices that were previously allocated are healthy. Signed-off-by: Swati Sehgal --- pkg/kubelet/cm/devicemanager/manager.go | 24 ++++- pkg/kubelet/cm/devicemanager/manager_test.go | 96 ++++++++++++++++++++ 2 files changed, 115 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 8cb57aa8190..7499de4460f 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -544,15 +544,29 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", string(podUID), contName, resource, devices.Len(), required) } } + + klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) + healthyDevices, hasRegistered := m.healthyDevices[resource] + + // Check if resource registered with devicemanager + if !hasRegistered { + return nil, fmt.Errorf("cannot allocate unregistered device %s", resource) + } + + // Check if registered resource has healthy devices + if healthyDevices.Len() == 0 { + return nil, fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resource) + } + + // Check if all the previously allocated devices are healthy + if !healthyDevices.IsSuperset(devices) { + return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource) + } + if needed == 0 { // No change, no work. return nil, nil } - klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) - // Check if resource registered with devicemanager - if _, ok := m.healthyDevices[resource]; !ok { - return nil, fmt.Errorf("can't allocate unregistered device %s", resource) - } // Declare the list of allocated devices. // This will be populated and returned below. diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 1a91ae2ee0c..56881f9e25a 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -993,6 +993,102 @@ func TestPodContainerDeviceAllocation(t *testing.T) { } +func TestPodContainerDeviceToAllocate(t *testing.T) { + resourceName1 := "domain1.com/resource1" + resourceName2 := "domain2.com/resource2" + resourceName3 := "domain2.com/resource3" + as := require.New(t) + tmpDir, err := os.MkdirTemp("", "checkpoint") + as.Nil(err) + defer os.RemoveAll(tmpDir) + + testManager := &ManagerImpl{ + endpoints: make(map[string]endpointInfo), + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: newPodDevices(), + } + + testManager.podDevices.insert("pod1", "con1", resourceName1, + constructDevices([]string{"dev1", "dev2"}), + constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}, + map[string]string{"/home/r2lib1": "/usr/r2lib1"}, + map[string]string{"r2devices": "dev1 dev2"})) + testManager.podDevices.insert("pod2", "con2", resourceName2, + checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, + constructAllocResp(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testManager.podDevices.insert("pod3", "con3", resourceName3, + checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, + constructAllocResp(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + + // no healthy devices for resourceName1 and devices corresponding to + // resource2 are intentionally omitted to simulate that the resource + // hasn't been registered. + testManager.healthyDevices[resourceName1] = sets.NewString() + testManager.healthyDevices[resourceName3] = sets.NewString() + // dev5 is no longer in the list of healthy devices + testManager.healthyDevices[resourceName3].Insert("dev7") + testManager.healthyDevices[resourceName3].Insert("dev8") + + testCases := []struct { + description string + podUID string + contName string + resource string + required int + reusableDevices sets.String + expectedAllocatedDevices sets.String + expErr error + }{ + { + description: "Admission error in case no healthy devices to allocate present", + podUID: "pod1", + contName: "con1", + resource: resourceName1, + required: 2, + reusableDevices: sets.NewString(), + expectedAllocatedDevices: nil, + expErr: fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resourceName1), + }, + { + description: "Admission error in case resource is not registered", + podUID: "pod2", + contName: "con2", + resource: resourceName2, + required: 1, + reusableDevices: sets.NewString(), + expectedAllocatedDevices: nil, + expErr: fmt.Errorf("cannot allocate unregistered device %s", resourceName2), + }, + { + description: "Admission error in case resource not devices previously allocated no longer healthy", + podUID: "pod3", + contName: "con3", + resource: resourceName3, + required: 1, + reusableDevices: sets.NewString(), + expectedAllocatedDevices: nil, + expErr: fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resourceName3), + }, + } + + for _, testCase := range testCases { + allocDevices, err := testManager.devicesToAllocate(testCase.podUID, testCase.contName, testCase.resource, testCase.required, testCase.reusableDevices) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v", + testCase.description, testCase.expErr, err) + } + if !reflect.DeepEqual(allocDevices, testCase.expectedAllocatedDevices) { + t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v", + testCase.description, testCase.expectedAllocatedDevices, allocDevices) + } + } + +} + func TestGetDeviceRunContainerOptions(t *testing.T) { res1 := TestResource{ resourceName: "domain1.com/resource1", From d509e79837d287cd0805255c68d8ee3b891815ec Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Sat, 24 Dec 2022 01:29:45 +0000 Subject: [PATCH 2/7] node: device-mgr: e2e: Implement End to end test This commit reuses e2e tests implmented as part of https://github.com/kubernetes/kubernetes/pull/110729. The commit is borrowed from the aforementioned PR as is to preserve authorship. Subsequent commit will update the end to end test to simulate the problem this PR is trying to solve by reproducing the issue: 109595. Co-authored-by: Francesco Romani Signed-off-by: Swati Sehgal --- test/e2e_node/device_manager_test.go | 187 +++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index 2892dae1175..ad65702b569 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -24,10 +24,14 @@ import ( "regexp" "sort" "strings" + "sync" "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -39,6 +43,8 @@ import ( e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" + testutils "k8s.io/kubernetes/test/utils" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -266,6 +272,131 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur }) }) + + ginkgo.Context("With sample device plugin", func(ctx context.Context) { + var deviceCount int = 2 + var devicePluginPod *v1.Pod + var testPods []*v1.Pod + + ginkgo.BeforeEach(func() { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + ginkgo.By("Scheduling a sample device plugin pod") + data, err := e2etestfiles.Read(SampleDevicePluginDS2YAML) + if err != nil { + framework.Fail(err.Error()) + } + ds := readDaemonSetV1OrDie(data) + + dp := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: sampleDevicePluginName, + }, + Spec: ds.Spec.Template.Spec, + } + + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + ginkgo.By("Waiting for devices to become available on the local node") + gomega.Eventually(func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && numberOfSampleResources(node) > 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + framework.Logf("Successfully created device plugin pod") + + devsLen := int64(deviceCount) // shortcut + ginkgo.By("Waiting for the resource exported by the sample device plugin to become available on the local node") + gomega.Eventually(func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + numberOfDevicesCapacity(node, resourceName) == devsLen && + numberOfDevicesAllocatable(node, resourceName) == devsLen + }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + }) + + ginkgo.AfterEach(func() { + ginkgo.By("Deleting the device plugin pod") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) + } + + restartKubelet(true) + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && numberOfSampleResources(node) <= 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + }) + + ginkgo.It("should recover device plugin pod first, then pod consuming devices", func() { + var err error + podCMD := "cat /tmp/Dev-* && sleep inf" + + ginkgo.By(fmt.Sprintf("creating %d pods requiring %q", deviceCount, resourceName)) + var devReqPods []*v1.Pod + for idx := 0; idx < deviceCount; idx++ { + pod := makeBusyboxDeviceRequiringPod(resourceName, podCMD) + devReqPods = append(devReqPods, pod) + } + testPods = e2epod.NewPodClient(f).CreateBatch(ctx, devReqPods) + + ginkgo.By("making sure all the pods are ready") + waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + ginkgo.By("stopping all the local containers - using CRI") + rs, _, err := getCRIClient() + framework.ExpectNoError(err) + sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{}) + framework.ExpectNoError(err) + for _, sandbox := range sandboxes { + gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil()) + ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id)) + + err := rs.RemovePodSandbox(ctx, sandbox.Id) + framework.ExpectNoError(err) + } + + ginkgo.By("restarting the kubelet") + startKubelet() + + ginkgo.By("waiting for the kubelet to be ready again") + // Wait for the Kubelet to be ready. + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + // TODO: here we need to tolerate pods waiting to be recreated + + ginkgo.By("making sure all the pods are ready after the recovery") + waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady) + + ginkgo.By("removing all the pods") + deleteBatch(ctx, f, testPods) + }) + + }) + }) func compareSRIOVResources(expected, got *sriovData) { @@ -351,3 +482,59 @@ func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) s sort.Strings(entries) return strings.Join(entries, ", ") } + +func waitForPodConditionBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod, conditionDesc string, timeout time.Duration, condition func(pod *v1.Pod) (bool, error)) { + var wg sync.WaitGroup + for _, pod := range pods { + wg.Add(1) + go func(podNS, podName string) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, podNS, podName, conditionDesc, timeout, condition) + framework.ExpectNoError(err, "pod %s/%s did not go running", podNS, podName) + framework.Logf("pod %s/%s running", podNS, podName) + }(pod.Namespace, pod.Name) + } + wg.Wait() +} + +func deleteBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod) { + var wg sync.WaitGroup + for _, pod := range pods { + wg.Add(1) + go func(podNS, podName string) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + + deletePodSyncByName(ctx, f, podName) + waitForAllContainerRemoval(ctx, podName, podNS) + }(pod.Namespace, pod.Name) + } + wg.Wait() +} + +func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod { + podName := "device-manager-test-" + string(uuid.NewUUID()) + rl := v1.ResourceList{ + v1.ResourceName(resourceName): *resource.NewQuantity(1, resource.DecimalSI), + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{{ + Image: busyboxImage, + Name: podName, + // Runs the specified command in the test pod. + Command: []string{"sh", "-c", cmd}, + Resources: v1.ResourceRequirements{ + Limits: rl, + Requests: rl, + }, + }}, + }, + } +} From 282a6a80b9299bb729fedce62d1d4980c8e298c1 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Sat, 24 Dec 2022 02:15:33 +0000 Subject: [PATCH 3/7] node: device-mgr: e2e: Update the e2e test to reproduce issue:109595 Breakdown of the steps implemented as part of this e2e test is as follows: 1. Create a file `registration` at path `/var/lib/kubelet/device-plugins/sample/` 2. Create sample device plugin with an environment variable with `REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration` that waits for a client to delete the control file. 3. Trigger plugin registeration by deleting the abovementioned directory. 4. Create a test pod requesting devices exposed by the device plugin. 5. Stop kubelet. 6. Remove pods using CRI to ensure new pods are created after kubelet restart. 7. Restart kubelet. 8. Wait for the sample device plugin pod to be running. In this case, the registration is not triggered. 9. Ensure that resource capacity/allocatable exported by the device plugin is zero. 10. The test pod should fail with `UnexpectedAdmissionError` 11. Delete the test pod. 12. Delete the sample device plugin pod. 13. Remove `/var/lib/kubelet/device-plugins/sample/` and its content, the directory created to control registration Signed-off-by: Swati Sehgal --- test/e2e_node/device_manager_test.go | 397 ++++++++++++++++++++------- test/e2e_node/image_list.go | 17 +- 2 files changed, 312 insertions(+), 102 deletions(-) diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index ad65702b569..40ab8204c9b 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -18,13 +18,13 @@ package e2enode import ( "context" + "errors" "fmt" "os" "path/filepath" "regexp" "sort" "strings" - "sync" "time" v1 "k8s.io/api/core/v1" @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -48,6 +49,8 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" + "github.com/onsi/gomega/types" ) const ( @@ -273,21 +276,84 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur }) - ginkgo.Context("With sample device plugin", func(ctx context.Context) { + /* + This end to end test is to simulate a scenario where after kubelet restart/node + reboot application pods requesting devices appear before the device plugin + pod exposing those devices as resources. + + The happy path is where after node reboot/ kubelet restart, the device plugin pod + appears before the application pod. This PR and this e2e test + aims to tackle the scenario where device plugin either does not appear first + or doesn't get the chance to re-register itself. + + Since there is no way of controlling the order in which the pods appear after + kubelet restart/node reboot, we can't guarantee that the application pod + recovers before device plugin pod (the scenario we want to exercise here). + If the device plugin pod is recovered before the test pod, we still can + meaningfully reproduce the scenario by NOT sending the registration command. + To do so sample device plugin is enhanced. For implementation details, refer to: + `test/images/sample-device-plugin/sampledeviceplugin.go`. This enhancement + allows auto-registration of the plugin to be controlled with the help of an environment + variable: REGISTER_CONTROL_FILE. By default this environment variable is not present + and the device plugin autoregisters to kubelet. For this e2e test, we use sample device + plugin spec with REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration + to allow manual registeration of the plugin to allow an application pod (requesting devices) + to successfully run on the node followed by kubelet restart where device plugin doesn't + register and the application pod fails with admission error. + + Breakdown of the steps implemented as part of this e2e test is as follows: + 1. Create a file `registration` at path `/var/lib/kubelet/device-plugins/sample/` + 2. Create sample device plugin with an environment variable with + `REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration` that + waits for a client to delete the control file. + 3. Trigger plugin registeration by deleting the abovementioned directory. + 4. Create a test pod requesting devices exposed by the device plugin. + 5. Stop kubelet. + 6. Remove pods using CRI to ensure new pods are created after kubelet restart. + 7. Restart kubelet. + 8. Wait for the sample device plugin pod to be running. In this case, + the registration is not triggered. + 9. Ensure that resource capacity/allocatable exported by the device plugin is zero. + 10. The test pod should fail with `UnexpectedAdmissionError` + 11. Delete the test pod. + 12. Delete the sample device plugin pod. + 13. Remove `/var/lib/kubelet/device-plugins/sample/` and its content, the directory created to control registration + */ + ginkgo.Context("With sample device plugin [Serial] [Disruptive]", func() { var deviceCount int = 2 var devicePluginPod *v1.Pod - var testPods []*v1.Pod + var triggerPathFile, triggerPathDir string - ginkgo.BeforeEach(func() { + // this test wants to reproduce what happened in https://github.com/kubernetes/kubernetes/issues/109595 + ginkgo.BeforeEach(func(ctx context.Context) { ginkgo.By("Wait for node to be ready") - gomega.Eventually(func() bool { - nodes, err := e2enode.TotalReady(ctx, f.ClientSet) - framework.ExpectNoError(err) - return nodes == 1 - }, time.Minute, time.Second).Should(gomega.BeTrue()) + gomega.Eventually(ctx, e2enode.TotalReady). + WithArguments(f.ClientSet). + WithTimeout(time.Minute). + Should(gomega.BeEquivalentTo(1)) + + ginkgo.By("Setting up the directory and file for controlling registration") + triggerPathDir = filepath.Join(devicePluginDir, "sample") + if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(triggerPathDir, os.ModePerm) + if err != nil { + klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err) + panic(err) + } + klog.InfoS("Directory created successfully") + + triggerPathFile = filepath.Join(triggerPathDir, "registration") + if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) { + _, err = os.Create(triggerPathFile) + if err != nil { + klog.Errorf("File creation %s failed: %v ", triggerPathFile, err) + panic(err) + } + } + } ginkgo.By("Scheduling a sample device plugin pod") - data, err := e2etestfiles.Read(SampleDevicePluginDS2YAML) + data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML) if err != nil { framework.Fail(err.Error()) } @@ -302,62 +368,51 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + go func() { + // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE + // environment variable is specified), device plugin registration needs to be triggerred + // manually. + // This is done by deleting the control file at the following path: + // `/var/lib/kubelet/device-plugins/sample/registration`. + + defer ginkgo.GinkgoRecover() + framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile) + err := os.Remove(triggerPathFile) + framework.ExpectNoError(err) + }() + ginkgo.By("Waiting for devices to become available on the local node") - gomega.Eventually(func() bool { - node, ready := getLocalTestNode(ctx, f) - return ready && numberOfSampleResources(node) > 0 - }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + + gomega.Eventually(ctx, isNodeReadyWithSampleResources). + WithArguments(f). + WithTimeout(5 * time.Minute). + Should(BeReady()) + framework.Logf("Successfully created device plugin pod") devsLen := int64(deviceCount) // shortcut ginkgo.By("Waiting for the resource exported by the sample device plugin to become available on the local node") - gomega.Eventually(func() bool { - node, ready := getLocalTestNode(ctx, f) - return ready && - numberOfDevicesCapacity(node, resourceName) == devsLen && - numberOfDevicesAllocatable(node, resourceName) == devsLen - }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + + gomega.Eventually(ctx, isNodeReadyWithAllocatableSampleResources). + WithArguments(f, devsLen). + WithTimeout(5 * time.Minute). + Should(HaveAllocatableDevices()) }) - ginkgo.AfterEach(func() { - ginkgo.By("Deleting the device plugin pod") - e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) - - ginkgo.By("Deleting any Pods created by the test") - l, err := e2epod.NewPodClient(f).List(context.TODO(), metav1.ListOptions{}) - framework.ExpectNoError(err) - for _, p := range l.Items { - if p.Namespace != f.Namespace.Name { - continue - } - - framework.Logf("Deleting pod: %s", p.Name) - e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) - } - - restartKubelet(true) - - ginkgo.By("Waiting for devices to become unavailable on the local node") - gomega.Eventually(func() bool { - node, ready := getLocalTestNode(ctx, f) - return ready && numberOfSampleResources(node) <= 0 - }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) - }) - - ginkgo.It("should recover device plugin pod first, then pod consuming devices", func() { + ginkgo.It("should deploy pod consuming devices first but fail with admission error after kubelet restart in case device plugin hasn't re-registered", func(ctx context.Context) { var err error - podCMD := "cat /tmp/Dev-* && sleep inf" + podCMD := "while true; do sleep 1000; done;" - ginkgo.By(fmt.Sprintf("creating %d pods requiring %q", deviceCount, resourceName)) - var devReqPods []*v1.Pod - for idx := 0; idx < deviceCount; idx++ { - pod := makeBusyboxDeviceRequiringPod(resourceName, podCMD) - devReqPods = append(devReqPods, pod) - } - testPods = e2epod.NewPodClient(f).CreateBatch(ctx, devReqPods) + ginkgo.By(fmt.Sprintf("creating a pods requiring %d %q", deviceCount, resourceName)) + + pod := makeBusyboxDeviceRequiringPod(resourceName, podCMD) + testPod := e2epod.NewPodClient(f).CreateSync(ctx, pod) ginkgo.By("making sure all the pods are ready") - waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady) + + err = e2epod.WaitForPodCondition(ctx, f.ClientSet, testPod.Namespace, testPod.Name, "Ready", 120*time.Second, testutils.PodRunningReady) + framework.ExpectNoError(err, "pod %s/%s did not go running", testPod.Namespace, testPod.Name) + framework.Logf("pod %s/%s running", testPod.Namespace, testPod.Name) ginkgo.By("stopping the kubelet") startKubelet := stopKubelet() @@ -380,19 +435,75 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur ginkgo.By("waiting for the kubelet to be ready again") // Wait for the Kubelet to be ready. - gomega.Eventually(func() bool { - nodes, err := e2enode.TotalReady(ctx, f.ClientSet) - framework.ExpectNoError(err) - return nodes == 1 - }, time.Minute, time.Second).Should(gomega.BeTrue()) - // TODO: here we need to tolerate pods waiting to be recreated + gomega.Eventually(ctx, e2enode.TotalReady). + WithArguments(f.ClientSet). + WithTimeout(2 * time.Minute). + Should(gomega.BeEquivalentTo(1)) ginkgo.By("making sure all the pods are ready after the recovery") - waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady) - ginkgo.By("removing all the pods") - deleteBatch(ctx, f, testPods) + var devicePluginPodAfterRestart *v1.Pod + + devicePluginPodAfterRestart, err = e2epod.NewPodClient(f).Get(ctx, devicePluginPod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + err = e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name, "Ready", 120*time.Second, testutils.PodRunningReady) + framework.ExpectNoError(err, "pod %s/%s did not go running", devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name) + framework.Logf("pod %s/%s running", devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name) + + ginkgo.By("Waiting for the resource capacity/allocatable exported by the sample device plugin to become zero") + + // The device plugin pod has restarted but has not re-registered to kubelet (as AUTO_REGISTER= false) + // and registration wasn't triggered manually (by writing to the unix socket exposed at + // `/var/lib/kubelet/device-plugins/registered`). Because of this, the capacity and allocatable corresponding + // to the resource exposed by the device plugin should be zero. + + gomega.Eventually(ctx, isNodeReadyWithAllocatableSampleResources). + WithArguments(f, int64(0)). + WithTimeout(5 * time.Minute). + Should(HaveAllocatableDevices()) + + ginkgo.By("Checking that pod requesting devices failed to start because of admission error") + + // NOTE: The device plugin won't re-register again and this is intentional. + // Because of this, the testpod (requesting a device) should fail with an admission error. + + gomega.Eventually(ctx, getPod). + WithArguments(f, testPod.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + ginkgo.By("removing application pods") + e2epod.NewPodClient(f).DeleteSync(ctx, testPod.Name, metav1.DeleteOptions{}, 2*time.Minute) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("Deleting the device plugin pod") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) + + ginkgo.By("Deleting the directory and file setup for controlling registration") + err := os.RemoveAll(triggerPathDir) + framework.ExpectNoError(err) + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) + } + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(ctx, isNodeReadyWithoutSampleResources). + WithArguments(f). + WithTimeout(5 * time.Minute). + Should(BeReady()) }) }) @@ -483,41 +594,10 @@ func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) s return strings.Join(entries, ", ") } -func waitForPodConditionBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod, conditionDesc string, timeout time.Duration, condition func(pod *v1.Pod) (bool, error)) { - var wg sync.WaitGroup - for _, pod := range pods { - wg.Add(1) - go func(podNS, podName string) { - defer ginkgo.GinkgoRecover() - defer wg.Done() - - err := e2epod.WaitForPodCondition(ctx, f.ClientSet, podNS, podName, conditionDesc, timeout, condition) - framework.ExpectNoError(err, "pod %s/%s did not go running", podNS, podName) - framework.Logf("pod %s/%s running", podNS, podName) - }(pod.Namespace, pod.Name) - } - wg.Wait() -} - -func deleteBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod) { - var wg sync.WaitGroup - for _, pod := range pods { - wg.Add(1) - go func(podNS, podName string) { - defer ginkgo.GinkgoRecover() - defer wg.Done() - - deletePodSyncByName(ctx, f, podName) - waitForAllContainerRemoval(ctx, podName, podNS) - }(pod.Namespace, pod.Name) - } - wg.Wait() -} - func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod { podName := "device-manager-test-" + string(uuid.NewUUID()) rl := v1.ResourceList{ - v1.ResourceName(resourceName): *resource.NewQuantity(1, resource.DecimalSI), + v1.ResourceName(resourceName): *resource.NewQuantity(2, resource.DecimalSI), } return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -538,3 +618,128 @@ func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod { }, } } + +// BeReady verifies that a node is ready and devices have registered. +func BeReady() types.GomegaMatcher { + return gomega.And( + // This additional matcher checks for the final error condition. + gcustom.MakeMatcher(func(ready bool) (bool, error) { + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + return true, nil + }), + BeInReadyPhase(true), + ) +} + +// BeInReadyPhase matches if node is ready i.e. ready is true. +func BeInReadyPhase(isReady bool) types.GomegaMatcher { + return gcustom.MakeMatcher(func(ready bool) (bool, error) { + return ready == isReady, nil + }).WithTemplate("expected Node Ready {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(isReady) +} + +func isNodeReadyWithSampleResources(ctx context.Context, f *framework.Framework) (bool, error) { + node, ready := getLocalTestNode(ctx, f) + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + + if numberOfSampleResources(node) <= 0 { + return false, fmt.Errorf("expected devices to be advertised") + } + return true, nil +} + +// HaveAllocatableDevices verifies that a node has allocatable devices. +func HaveAllocatableDevices() types.GomegaMatcher { + return gomega.And( + // This additional matcher checks for the final error condition. + gcustom.MakeMatcher(func(hasAllocatable bool) (bool, error) { + if !hasAllocatable { + return false, fmt.Errorf("expected node to be have allocatable devices=%t", hasAllocatable) + } + return true, nil + }), + hasAllocatable(true), + ) +} + +// hasAllocatable matches if node is ready i.e. ready is true. +func hasAllocatable(hasAllocatable bool) types.GomegaMatcher { + return gcustom.MakeMatcher(func(hasAllocatableDevices bool) (bool, error) { + return hasAllocatableDevices == hasAllocatable, nil + }).WithTemplate("expected Node with allocatable {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(hasAllocatable) +} + +func isNodeReadyWithAllocatableSampleResources(ctx context.Context, f *framework.Framework, devCount int64) (bool, error) { + node, ready := getLocalTestNode(ctx, f) + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + + if numberOfDevicesCapacity(node, resourceName) != devCount { + return false, fmt.Errorf("expected devices capacity to be: %d", devCount) + } + + if numberOfDevicesAllocatable(node, resourceName) != devCount { + return false, fmt.Errorf("expected devices allocatable to be: %d", devCount) + } + return true, nil +} + +func isNodeReadyWithoutSampleResources(ctx context.Context, f *framework.Framework) (bool, error) { + node, ready := getLocalTestNode(ctx, f) + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + + if numberOfSampleResources(node) > 0 { + return false, fmt.Errorf("expected devices to be not present") + } + return true, nil +} + +// HaveFailedWithAdmissionError verifies that a pod fails at admission. +func HaveFailedWithAdmissionError() types.GomegaMatcher { + return gomega.And( + gcustom.MakeMatcher(func(hasFailed bool) (bool, error) { + if !hasFailed { + return false, fmt.Errorf("expected pod to have failed=%t", hasFailed) + } + return true, nil + }), + hasFailed(true), + ) +} + +// hasFailed matches if pod has failed. +func hasFailed(hasFailed bool) types.GomegaMatcher { + return gcustom.MakeMatcher(func(hasPodFailed bool) (bool, error) { + return hasPodFailed == hasFailed, nil + }).WithTemplate("expected Pod failed {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(hasFailed) +} + +func getPod(ctx context.Context, f *framework.Framework, podName string) (bool, error) { + + pod, err := e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("expected node to get pod=%q got err=%q", pod.Name, err) + } + + expectedStatusReason := "UnexpectedAdmissionError" + expectedStatusMessage := "Allocate failed due to no healthy devices present; cannot allocate unhealthy devices" + + // This additional matcher checks for the final error condition. + if pod.Status.Phase != v1.PodFailed { + return false, fmt.Errorf("expected pod to reach phase %q, got final phase %q instead.", v1.PodFailed, pod.Status.Phase) + } + if pod.Status.Reason != expectedStatusReason { + return false, fmt.Errorf("expected pod status reason to be %q, got %q instead.", expectedStatusReason, pod.Status.Reason) + } + if !strings.Contains(pod.Status.Message, expectedStatusMessage) { + return false, fmt.Errorf("expected pod status reason to contain %q, got %q instead.", expectedStatusMessage, pod.Status.Message) + } + return true, nil +} diff --git a/test/e2e_node/image_list.go b/test/e2e_node/image_list.go index 611590642bc..e6e7da29766 100644 --- a/test/e2e_node/image_list.go +++ b/test/e2e_node/image_list.go @@ -87,11 +87,16 @@ func updateImageAllowList(ctx context.Context) { } else { e2epod.ImagePrePullList.Insert(gpuDevicePluginImage) } - if samplePluginImage, err := getSampleDevicePluginImage(); err != nil { + if samplePluginImage, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginDSYAML); err != nil { klog.Errorln(err) } else { e2epod.ImagePrePullList.Insert(samplePluginImage) } + if samplePluginImageCtrlReg, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginControlRegistrationDSYAML); err != nil { + klog.Errorln(err) + } else { + e2epod.ImagePrePullList.Insert(samplePluginImageCtrlReg) + } } func getNodeProblemDetectorImage() string { @@ -222,19 +227,19 @@ func getGPUDevicePluginImage(ctx context.Context) (string, error) { return ds.Spec.Template.Spec.Containers[0].Image, nil } -func getSampleDevicePluginImage() (string, error) { - data, err := e2etestfiles.Read(SampleDevicePluginDSYAML) +func getContainerImageFromE2ETestDaemonset(dsYamlPath string) (string, error) { + data, err := e2etestfiles.Read(dsYamlPath) if err != nil { - return "", fmt.Errorf("failed to read the sample plugin yaml: %w", err) + return "", fmt.Errorf("failed to read the daemonset yaml: %w", err) } ds, err := e2emanifest.DaemonSetFromData(data) if err != nil { - return "", fmt.Errorf("failed to parse daemon set for sample plugin: %w", err) + return "", fmt.Errorf("failed to parse daemonset yaml: %w", err) } if len(ds.Spec.Template.Spec.Containers) < 1 { - return "", fmt.Errorf("failed to parse the sample plugin image: cannot extract the container from YAML") + return "", fmt.Errorf("failed to parse the container image: cannot extract the container from YAML") } return ds.Spec.Template.Spec.Containers[0].Image, nil } From 9697573703376cfc3004499d4bbd8deb16397f22 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Mon, 27 Feb 2023 13:34:59 +0000 Subject: [PATCH 4/7] node: device-mgr: e2e: adapt to sample device plugin refactoring These updates are to adapt to the sample device plugin refactoring done here: https://github.com/kubernetes/kubernetes/pull/115926/commits/92e00203e027bf88ddc695fc17e80c913a6b178a. Signed-off-by: Swati Sehgal --- test/e2e_node/device_manager_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index 40ab8204c9b..95434849d34 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -361,7 +361,7 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur dp := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: sampleDevicePluginName, + Name: SampleDevicePluginName, }, Spec: ds.Spec.Template.Spec, } @@ -403,9 +403,9 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur var err error podCMD := "while true; do sleep 1000; done;" - ginkgo.By(fmt.Sprintf("creating a pods requiring %d %q", deviceCount, resourceName)) + ginkgo.By(fmt.Sprintf("creating a pods requiring %d %q", deviceCount, SampleDeviceResourceName)) - pod := makeBusyboxDeviceRequiringPod(resourceName, podCMD) + pod := makeBusyboxDeviceRequiringPod(SampleDeviceResourceName, podCMD) testPod := e2epod.NewPodClient(f).CreateSync(ctx, pod) ginkgo.By("making sure all the pods are ready") @@ -646,7 +646,7 @@ func isNodeReadyWithSampleResources(ctx context.Context, f *framework.Framework) return false, fmt.Errorf("expected node to be ready=%t", ready) } - if numberOfSampleResources(node) <= 0 { + if CountSampleDeviceCapacity(node) <= 0 { return false, fmt.Errorf("expected devices to be advertised") } return true, nil @@ -679,11 +679,11 @@ func isNodeReadyWithAllocatableSampleResources(ctx context.Context, f *framework return false, fmt.Errorf("expected node to be ready=%t", ready) } - if numberOfDevicesCapacity(node, resourceName) != devCount { + if CountSampleDeviceCapacity(node) != devCount { return false, fmt.Errorf("expected devices capacity to be: %d", devCount) } - if numberOfDevicesAllocatable(node, resourceName) != devCount { + if CountSampleDeviceAllocatable(node) != devCount { return false, fmt.Errorf("expected devices allocatable to be: %d", devCount) } return true, nil @@ -695,7 +695,7 @@ func isNodeReadyWithoutSampleResources(ctx context.Context, f *framework.Framewo return false, fmt.Errorf("expected node to be ready=%t", ready) } - if numberOfSampleResources(node) > 0 { + if CountSampleDeviceCapacity(node) > 0 { return false, fmt.Errorf("expected devices to be not present") } return true, nil From a26f4d855d5f4fdd391068f30720c7413e4f6e28 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Wed, 26 Apr 2023 16:59:57 +0100 Subject: [PATCH 5/7] node: device-plugin: e2e: Capture pod admission failure This test captures that scenario where after kubelet restart, application pod comes up and the device plugin pod hasn't re-registered itself, the pod fails with admission error. It is worth noting that once the device plugin pod has registered itself, another application pod requesting devices ends up running successfully. For the test case where kubelet is restarted and device plugin has re-registered without involving pod restart, since the pod after kubelet restart ends up with admission error, we cannot be certain the device that the second pod (pod2) would get. As long as, it gets a device we consider the test to pass. Signed-off-by: Swati Sehgal --- test/e2e_node/device_plugin_test.go | 48 +++++++++++++++++------------ 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index 5ac7b2c1192..a436fa45f43 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -313,8 +313,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) - err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, pod1.Name, f.Namespace.Name, 1*time.Minute) - framework.ExpectNoError(err) + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. // note we don't check again the logs of the container: the check is done at startup, the container @@ -351,6 +355,26 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { ginkgo.By("Wait for node to be ready again") e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after pod restart") + ginkgo.By("Re-Register resources by deleting the plugin pod") gp := int64(0) deleteOptions := metav1.DeleteOptions{ @@ -370,36 +394,20 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) - // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. - // note we don't check again the logs of the container: the check is done at startup, the container - // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check - // is useless. - ginkgo.By("Verifying the device assignment after kubelet and device plugin restart using podresources API") - gomega.Eventually(ctx, func() error { - v1PodResources, err = getV1NodeDevices(ctx) - return err - }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet and device plugin restart") - - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after pod restart") - ginkgo.By("Creating another pod") pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) - ginkgo.By("Checking that pod got a different fake device") + ginkgo.By("Checking that pod got a fake device") devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE) framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name) - gomega.Expect(devID1).To(gomega.Not(gomega.Equal(devID2)), "pod2 requested a device but started successfully without") - ginkgo.By("Verifying the device assignment after kubelet restart and device plugin re-registration using podresources API") // note we don't use eventually: the kubelet is supposed to be running and stable by now, so the call should just succeed v1PodResources, err = getV1NodeDevices(ctx) if err != nil { framework.ExpectNoError(err, "getting pod resources assignment after pod restart") } - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod1") + err = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) From 3dbb741c97608130ac746168c355dfe79a4e99e2 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Wed, 26 Apr 2023 17:41:14 +0100 Subject: [PATCH 6/7] node: device-plugin: add node reboot test scenario Add a test suit to simulate node reboot (achieved by removing pods using CRI API before kubelet is restarted). Signed-off-by: Swati Sehgal --- test/e2e_node/device_plugin_test.go | 202 ++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index a436fa45f43..899c45775dc 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -18,7 +18,9 @@ package e2enode import ( "context" + "errors" "fmt" + "os" "path/filepath" "regexp" "strings" @@ -32,6 +34,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" admissionapi "k8s.io/pod-security-admission/api" @@ -43,6 +47,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" ) var ( @@ -55,6 +60,7 @@ var _ = SIGDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:Devic f := framework.NewDefaultFramework("device-plugin-errors") f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged testDevicePlugin(f, kubeletdevicepluginv1beta1.DevicePluginPath) + testDevicePluginNodeReboot(f, kubeletdevicepluginv1beta1.DevicePluginPath) }) // readDaemonSetV1OrDie reads daemonset object from bytes. Panics on error. @@ -414,6 +420,202 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { }) } +func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { + ginkgo.Context("DevicePlugin [Serial] [Disruptive]", func() { + var devicePluginPod *v1.Pod + var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse + var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse + var triggerPathFile, triggerPathDir string + var err error + ginkgo.BeforeEach(func(ctx context.Context) { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(ctx, func(ctx context.Context) bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + // Before we run the device plugin test, we need to ensure + // that the cluster is in a clean state and there are no + // pods running on this node. + // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. + // xref: https://issue.k8s.io/115381 + gomega.Eventually(ctx, func(ctx context.Context) error { + v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err) + } + + v1PodResources, err = getV1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err) + } + + if len(v1alphaPodResources.PodResources) > 0 { + return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources) + } + + if len(v1PodResources.PodResources) > 0 { + return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) + } + return nil + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) + + ginkgo.By("Setting up the directory and file for controlling registration") + triggerPathDir = filepath.Join(devicePluginDir, "sample") + if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(triggerPathDir, os.ModePerm) + if err != nil { + klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err) + panic(err) + } + klog.InfoS("Directory created successfully") + + triggerPathFile = filepath.Join(triggerPathDir, "registration") + if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) { + _, err = os.Create(triggerPathFile) + if err != nil { + klog.Errorf("File creation %s failed: %v ", triggerPathFile, err) + panic(err) + } + } + } + + ginkgo.By("Scheduling a sample device plugin pod") + data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML) + if err != nil { + framework.Fail(err.Error()) + } + ds := readDaemonSetV1OrDie(data) + + dp := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: SampleDevicePluginName, + }, + Spec: ds.Spec.Template.Spec, + } + + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + go func() { + // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE + // environment variable is specified), device plugin registration needs to be triggerred + // manually. + // This is done by deleting the control file at the following path: + // `/var/lib/kubelet/device-plugins/sample/registration`. + + defer ginkgo.GinkgoRecover() + framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile) + err := os.Remove(triggerPathFile) + framework.ExpectNoError(err) + }() + + ginkgo.By("Waiting for devices to become available on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && CountSampleDeviceCapacity(node) > 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + framework.Logf("Successfully created device plugin pod") + + ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", expectedSampleDevsAmount)) + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && + CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("Deleting the device plugin pod") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) + } + + err = os.Remove(triggerPathDir) + framework.ExpectNoError(err) + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && CountSampleDeviceCapacity(node) <= 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + + ginkgo.By("devices now unavailable on the local node") + }) + + // simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that, + // intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod + // exposing those devices as resource has re-registers itself to Kubelet. The expected behavior is that the application pod fails at + // admission time. + ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + + gomega.Expect(devID1).To(gomega.Not(gomega.Equal(""))) + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + ginkgo.By("stopping all the local containers - using CRI") + rs, _, err := getCRIClient() + framework.ExpectNoError(err) + sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{}) + framework.ExpectNoError(err) + for _, sandbox := range sandboxes { + gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil()) + ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id)) + + err := rs.RemovePodSandbox(ctx, sandbox.Id) + framework.ExpectNoError(err) + } + + ginkgo.By("restarting the kubelet") + startKubelet() + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after node reboot") + + }) + }) +} + // makeBusyboxPod returns a simple Pod spec with a busybox container // that requests SampleDeviceResourceName and runs the specified command. func makeBusyboxPod(SampleDeviceResourceName, cmd string) *v1.Pod { From d727df1741e68f7159c95d3b5a7f55535d125e5e Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Thu, 27 Apr 2023 11:26:50 +0100 Subject: [PATCH 7/7] node: device-plugin: e2e: Additional test cases Additional test cases added: Keeps device plugin assignments across pod and kubelet restarts (no device plugin re-registration) Keeps device plugin assignments after the device plugin has re-registered (no kubelet or pod restart) Signed-off-by: Swati Sehgal --- test/e2e_node/device_plugin_test.go | 119 ++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index 899c45775dc..e4d598348f9 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -340,6 +340,125 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) + // simulate kubelet and container restart, *but not* device plugin re-registration. + // The device assignment should be kept and be stable across the kubelet and container restart, because it's the kubelet which + // performs the device allocation, and both the device plugin is stable. + ginkgo.It("Keeps device plugin assignments across pod and kubelet restarts (no device plugin re-registration)", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + + gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Waiting for container to restart") + ensurePodContainerRestart(ctx, f, pod1.Name, pod1.Name) + + ginkgo.By("Confirming that after a container restart, fake-device assignment is kept") + devIDRestart1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + framework.ExpectEqual(devIDRestart1, devID1) + + ginkgo.By("Restarting Kubelet") + restartKubelet(true) + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after pod restart") + }) + + // simulate device plugin re-registration, *but not* container and kubelet restart. + // After the device plugin has re-registered, the list healthy devices is repopulated based on the devices discovered. + // Once Pod2 is running we determine the device that was allocated it. As long as the device allocation succeeds the + // test should pass. + + ginkgo.It("Keeps device plugin assignments after the device plugin has been re-registered (no kubelet, pod restart)", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Re-Register resources and delete the plugin pod") + gp := int64(0) + deleteOptions := metav1.DeleteOptions{ + GracePeriodSeconds: &gp, + } + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, time.Minute) + waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) + + ginkgo.By("Recreating the plugin pod") + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) + err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod.Name, devicePluginPod.Namespace, 1*time.Minute) + framework.ExpectNoError(err) + + ginkgo.By("Waiting for resource to become available on the local node after re-registration") + gomega.Eventually(ctx, func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && + CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + + // crosscheck that after device plugin re-registration the device assignment is preserved and + // stable from the kubelet's perspective. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after device plugin re-registration using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after pod restart") + + ginkgo.By("Creating another pod") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, pod2.Name, f.Namespace.Name, 1*time.Minute) + framework.ExpectNoError(err) + + ginkgo.By("Checking that pod got a fake device") + devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name) + + gomega.Expect(devID2).To(gomega.Not(gomega.Equal("")), "pod2 requested a device but started successfully without") + }) + // simulate kubelet restart *and* device plugin re-registration, while the pod and the container stays running. // The device assignment should be kept and be stable across the kubelet/device plugin restart, as both the aforementioned components // orchestrate the device allocation: the actual consumer (container) is stable.