diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 8cb57aa8190..7499de4460f 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -544,15 +544,29 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", string(podUID), contName, resource, devices.Len(), required) } } + + klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) + healthyDevices, hasRegistered := m.healthyDevices[resource] + + // Check if resource registered with devicemanager + if !hasRegistered { + return nil, fmt.Errorf("cannot allocate unregistered device %s", resource) + } + + // Check if registered resource has healthy devices + if healthyDevices.Len() == 0 { + return nil, fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resource) + } + + // Check if all the previously allocated devices are healthy + if !healthyDevices.IsSuperset(devices) { + return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource) + } + if needed == 0 { // No change, no work. return nil, nil } - klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) - // Check if resource registered with devicemanager - if _, ok := m.healthyDevices[resource]; !ok { - return nil, fmt.Errorf("can't allocate unregistered device %s", resource) - } // Declare the list of allocated devices. // This will be populated and returned below. diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 1a91ae2ee0c..56881f9e25a 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -993,6 +993,102 @@ func TestPodContainerDeviceAllocation(t *testing.T) { } +func TestPodContainerDeviceToAllocate(t *testing.T) { + resourceName1 := "domain1.com/resource1" + resourceName2 := "domain2.com/resource2" + resourceName3 := "domain2.com/resource3" + as := require.New(t) + tmpDir, err := os.MkdirTemp("", "checkpoint") + as.Nil(err) + defer os.RemoveAll(tmpDir) + + testManager := &ManagerImpl{ + endpoints: make(map[string]endpointInfo), + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: newPodDevices(), + } + + testManager.podDevices.insert("pod1", "con1", resourceName1, + constructDevices([]string{"dev1", "dev2"}), + constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}, + map[string]string{"/home/r2lib1": "/usr/r2lib1"}, + map[string]string{"r2devices": "dev1 dev2"})) + testManager.podDevices.insert("pod2", "con2", resourceName2, + checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, + constructAllocResp(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testManager.podDevices.insert("pod3", "con3", resourceName3, + checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, + constructAllocResp(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + + // no healthy devices for resourceName1 and devices corresponding to + // resource2 are intentionally omitted to simulate that the resource + // hasn't been registered. + testManager.healthyDevices[resourceName1] = sets.NewString() + testManager.healthyDevices[resourceName3] = sets.NewString() + // dev5 is no longer in the list of healthy devices + testManager.healthyDevices[resourceName3].Insert("dev7") + testManager.healthyDevices[resourceName3].Insert("dev8") + + testCases := []struct { + description string + podUID string + contName string + resource string + required int + reusableDevices sets.String + expectedAllocatedDevices sets.String + expErr error + }{ + { + description: "Admission error in case no healthy devices to allocate present", + podUID: "pod1", + contName: "con1", + resource: resourceName1, + required: 2, + reusableDevices: sets.NewString(), + expectedAllocatedDevices: nil, + expErr: fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resourceName1), + }, + { + description: "Admission error in case resource is not registered", + podUID: "pod2", + contName: "con2", + resource: resourceName2, + required: 1, + reusableDevices: sets.NewString(), + expectedAllocatedDevices: nil, + expErr: fmt.Errorf("cannot allocate unregistered device %s", resourceName2), + }, + { + description: "Admission error in case resource not devices previously allocated no longer healthy", + podUID: "pod3", + contName: "con3", + resource: resourceName3, + required: 1, + reusableDevices: sets.NewString(), + expectedAllocatedDevices: nil, + expErr: fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resourceName3), + }, + } + + for _, testCase := range testCases { + allocDevices, err := testManager.devicesToAllocate(testCase.podUID, testCase.contName, testCase.resource, testCase.required, testCase.reusableDevices) + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v", + testCase.description, testCase.expErr, err) + } + if !reflect.DeepEqual(allocDevices, testCase.expectedAllocatedDevices) { + t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v", + testCase.description, testCase.expectedAllocatedDevices, allocDevices) + } + } + +} + func TestGetDeviceRunContainerOptions(t *testing.T) { res1 := TestResource{ resourceName: "domain1.com/resource1", diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index 2892dae1175..95434849d34 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -18,6 +18,7 @@ package e2enode import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -27,7 +28,11 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -39,9 +44,13 @@ import ( e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" + testutils "k8s.io/kubernetes/test/utils" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" + "github.com/onsi/gomega/types" ) const ( @@ -266,6 +275,239 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur }) }) + + /* + This end to end test is to simulate a scenario where after kubelet restart/node + reboot application pods requesting devices appear before the device plugin + pod exposing those devices as resources. + + The happy path is where after node reboot/ kubelet restart, the device plugin pod + appears before the application pod. This PR and this e2e test + aims to tackle the scenario where device plugin either does not appear first + or doesn't get the chance to re-register itself. + + Since there is no way of controlling the order in which the pods appear after + kubelet restart/node reboot, we can't guarantee that the application pod + recovers before device plugin pod (the scenario we want to exercise here). + If the device plugin pod is recovered before the test pod, we still can + meaningfully reproduce the scenario by NOT sending the registration command. + To do so sample device plugin is enhanced. For implementation details, refer to: + `test/images/sample-device-plugin/sampledeviceplugin.go`. This enhancement + allows auto-registration of the plugin to be controlled with the help of an environment + variable: REGISTER_CONTROL_FILE. By default this environment variable is not present + and the device plugin autoregisters to kubelet. For this e2e test, we use sample device + plugin spec with REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration + to allow manual registeration of the plugin to allow an application pod (requesting devices) + to successfully run on the node followed by kubelet restart where device plugin doesn't + register and the application pod fails with admission error. + + Breakdown of the steps implemented as part of this e2e test is as follows: + 1. Create a file `registration` at path `/var/lib/kubelet/device-plugins/sample/` + 2. Create sample device plugin with an environment variable with + `REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration` that + waits for a client to delete the control file. + 3. Trigger plugin registeration by deleting the abovementioned directory. + 4. Create a test pod requesting devices exposed by the device plugin. + 5. Stop kubelet. + 6. Remove pods using CRI to ensure new pods are created after kubelet restart. + 7. Restart kubelet. + 8. Wait for the sample device plugin pod to be running. In this case, + the registration is not triggered. + 9. Ensure that resource capacity/allocatable exported by the device plugin is zero. + 10. The test pod should fail with `UnexpectedAdmissionError` + 11. Delete the test pod. + 12. Delete the sample device plugin pod. + 13. Remove `/var/lib/kubelet/device-plugins/sample/` and its content, the directory created to control registration + */ + ginkgo.Context("With sample device plugin [Serial] [Disruptive]", func() { + var deviceCount int = 2 + var devicePluginPod *v1.Pod + var triggerPathFile, triggerPathDir string + + // this test wants to reproduce what happened in https://github.com/kubernetes/kubernetes/issues/109595 + ginkgo.BeforeEach(func(ctx context.Context) { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(ctx, e2enode.TotalReady). + WithArguments(f.ClientSet). + WithTimeout(time.Minute). + Should(gomega.BeEquivalentTo(1)) + + ginkgo.By("Setting up the directory and file for controlling registration") + triggerPathDir = filepath.Join(devicePluginDir, "sample") + if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(triggerPathDir, os.ModePerm) + if err != nil { + klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err) + panic(err) + } + klog.InfoS("Directory created successfully") + + triggerPathFile = filepath.Join(triggerPathDir, "registration") + if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) { + _, err = os.Create(triggerPathFile) + if err != nil { + klog.Errorf("File creation %s failed: %v ", triggerPathFile, err) + panic(err) + } + } + } + + ginkgo.By("Scheduling a sample device plugin pod") + data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML) + if err != nil { + framework.Fail(err.Error()) + } + ds := readDaemonSetV1OrDie(data) + + dp := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: SampleDevicePluginName, + }, + Spec: ds.Spec.Template.Spec, + } + + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + go func() { + // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE + // environment variable is specified), device plugin registration needs to be triggerred + // manually. + // This is done by deleting the control file at the following path: + // `/var/lib/kubelet/device-plugins/sample/registration`. + + defer ginkgo.GinkgoRecover() + framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile) + err := os.Remove(triggerPathFile) + framework.ExpectNoError(err) + }() + + ginkgo.By("Waiting for devices to become available on the local node") + + gomega.Eventually(ctx, isNodeReadyWithSampleResources). + WithArguments(f). + WithTimeout(5 * time.Minute). + Should(BeReady()) + + framework.Logf("Successfully created device plugin pod") + + devsLen := int64(deviceCount) // shortcut + ginkgo.By("Waiting for the resource exported by the sample device plugin to become available on the local node") + + gomega.Eventually(ctx, isNodeReadyWithAllocatableSampleResources). + WithArguments(f, devsLen). + WithTimeout(5 * time.Minute). + Should(HaveAllocatableDevices()) + }) + + ginkgo.It("should deploy pod consuming devices first but fail with admission error after kubelet restart in case device plugin hasn't re-registered", func(ctx context.Context) { + var err error + podCMD := "while true; do sleep 1000; done;" + + ginkgo.By(fmt.Sprintf("creating a pods requiring %d %q", deviceCount, SampleDeviceResourceName)) + + pod := makeBusyboxDeviceRequiringPod(SampleDeviceResourceName, podCMD) + testPod := e2epod.NewPodClient(f).CreateSync(ctx, pod) + + ginkgo.By("making sure all the pods are ready") + + err = e2epod.WaitForPodCondition(ctx, f.ClientSet, testPod.Namespace, testPod.Name, "Ready", 120*time.Second, testutils.PodRunningReady) + framework.ExpectNoError(err, "pod %s/%s did not go running", testPod.Namespace, testPod.Name) + framework.Logf("pod %s/%s running", testPod.Namespace, testPod.Name) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + ginkgo.By("stopping all the local containers - using CRI") + rs, _, err := getCRIClient() + framework.ExpectNoError(err) + sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{}) + framework.ExpectNoError(err) + for _, sandbox := range sandboxes { + gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil()) + ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id)) + + err := rs.RemovePodSandbox(ctx, sandbox.Id) + framework.ExpectNoError(err) + } + + ginkgo.By("restarting the kubelet") + startKubelet() + + ginkgo.By("waiting for the kubelet to be ready again") + // Wait for the Kubelet to be ready. + + gomega.Eventually(ctx, e2enode.TotalReady). + WithArguments(f.ClientSet). + WithTimeout(2 * time.Minute). + Should(gomega.BeEquivalentTo(1)) + + ginkgo.By("making sure all the pods are ready after the recovery") + + var devicePluginPodAfterRestart *v1.Pod + + devicePluginPodAfterRestart, err = e2epod.NewPodClient(f).Get(ctx, devicePluginPod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + err = e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name, "Ready", 120*time.Second, testutils.PodRunningReady) + framework.ExpectNoError(err, "pod %s/%s did not go running", devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name) + framework.Logf("pod %s/%s running", devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name) + + ginkgo.By("Waiting for the resource capacity/allocatable exported by the sample device plugin to become zero") + + // The device plugin pod has restarted but has not re-registered to kubelet (as AUTO_REGISTER= false) + // and registration wasn't triggered manually (by writing to the unix socket exposed at + // `/var/lib/kubelet/device-plugins/registered`). Because of this, the capacity and allocatable corresponding + // to the resource exposed by the device plugin should be zero. + + gomega.Eventually(ctx, isNodeReadyWithAllocatableSampleResources). + WithArguments(f, int64(0)). + WithTimeout(5 * time.Minute). + Should(HaveAllocatableDevices()) + + ginkgo.By("Checking that pod requesting devices failed to start because of admission error") + + // NOTE: The device plugin won't re-register again and this is intentional. + // Because of this, the testpod (requesting a device) should fail with an admission error. + + gomega.Eventually(ctx, getPod). + WithArguments(f, testPod.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + ginkgo.By("removing application pods") + e2epod.NewPodClient(f).DeleteSync(ctx, testPod.Name, metav1.DeleteOptions{}, 2*time.Minute) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("Deleting the device plugin pod") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) + + ginkgo.By("Deleting the directory and file setup for controlling registration") + err := os.RemoveAll(triggerPathDir) + framework.ExpectNoError(err) + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) + } + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(ctx, isNodeReadyWithoutSampleResources). + WithArguments(f). + WithTimeout(5 * time.Minute). + Should(BeReady()) + }) + + }) + }) func compareSRIOVResources(expected, got *sriovData) { @@ -351,3 +593,153 @@ func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) s sort.Strings(entries) return strings.Join(entries, ", ") } + +func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod { + podName := "device-manager-test-" + string(uuid.NewUUID()) + rl := v1.ResourceList{ + v1.ResourceName(resourceName): *resource.NewQuantity(2, resource.DecimalSI), + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{{ + Image: busyboxImage, + Name: podName, + // Runs the specified command in the test pod. + Command: []string{"sh", "-c", cmd}, + Resources: v1.ResourceRequirements{ + Limits: rl, + Requests: rl, + }, + }}, + }, + } +} + +// BeReady verifies that a node is ready and devices have registered. +func BeReady() types.GomegaMatcher { + return gomega.And( + // This additional matcher checks for the final error condition. + gcustom.MakeMatcher(func(ready bool) (bool, error) { + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + return true, nil + }), + BeInReadyPhase(true), + ) +} + +// BeInReadyPhase matches if node is ready i.e. ready is true. +func BeInReadyPhase(isReady bool) types.GomegaMatcher { + return gcustom.MakeMatcher(func(ready bool) (bool, error) { + return ready == isReady, nil + }).WithTemplate("expected Node Ready {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(isReady) +} + +func isNodeReadyWithSampleResources(ctx context.Context, f *framework.Framework) (bool, error) { + node, ready := getLocalTestNode(ctx, f) + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + + if CountSampleDeviceCapacity(node) <= 0 { + return false, fmt.Errorf("expected devices to be advertised") + } + return true, nil +} + +// HaveAllocatableDevices verifies that a node has allocatable devices. +func HaveAllocatableDevices() types.GomegaMatcher { + return gomega.And( + // This additional matcher checks for the final error condition. + gcustom.MakeMatcher(func(hasAllocatable bool) (bool, error) { + if !hasAllocatable { + return false, fmt.Errorf("expected node to be have allocatable devices=%t", hasAllocatable) + } + return true, nil + }), + hasAllocatable(true), + ) +} + +// hasAllocatable matches if node is ready i.e. ready is true. +func hasAllocatable(hasAllocatable bool) types.GomegaMatcher { + return gcustom.MakeMatcher(func(hasAllocatableDevices bool) (bool, error) { + return hasAllocatableDevices == hasAllocatable, nil + }).WithTemplate("expected Node with allocatable {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(hasAllocatable) +} + +func isNodeReadyWithAllocatableSampleResources(ctx context.Context, f *framework.Framework, devCount int64) (bool, error) { + node, ready := getLocalTestNode(ctx, f) + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + + if CountSampleDeviceCapacity(node) != devCount { + return false, fmt.Errorf("expected devices capacity to be: %d", devCount) + } + + if CountSampleDeviceAllocatable(node) != devCount { + return false, fmt.Errorf("expected devices allocatable to be: %d", devCount) + } + return true, nil +} + +func isNodeReadyWithoutSampleResources(ctx context.Context, f *framework.Framework) (bool, error) { + node, ready := getLocalTestNode(ctx, f) + if !ready { + return false, fmt.Errorf("expected node to be ready=%t", ready) + } + + if CountSampleDeviceCapacity(node) > 0 { + return false, fmt.Errorf("expected devices to be not present") + } + return true, nil +} + +// HaveFailedWithAdmissionError verifies that a pod fails at admission. +func HaveFailedWithAdmissionError() types.GomegaMatcher { + return gomega.And( + gcustom.MakeMatcher(func(hasFailed bool) (bool, error) { + if !hasFailed { + return false, fmt.Errorf("expected pod to have failed=%t", hasFailed) + } + return true, nil + }), + hasFailed(true), + ) +} + +// hasFailed matches if pod has failed. +func hasFailed(hasFailed bool) types.GomegaMatcher { + return gcustom.MakeMatcher(func(hasPodFailed bool) (bool, error) { + return hasPodFailed == hasFailed, nil + }).WithTemplate("expected Pod failed {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(hasFailed) +} + +func getPod(ctx context.Context, f *framework.Framework, podName string) (bool, error) { + + pod, err := e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("expected node to get pod=%q got err=%q", pod.Name, err) + } + + expectedStatusReason := "UnexpectedAdmissionError" + expectedStatusMessage := "Allocate failed due to no healthy devices present; cannot allocate unhealthy devices" + + // This additional matcher checks for the final error condition. + if pod.Status.Phase != v1.PodFailed { + return false, fmt.Errorf("expected pod to reach phase %q, got final phase %q instead.", v1.PodFailed, pod.Status.Phase) + } + if pod.Status.Reason != expectedStatusReason { + return false, fmt.Errorf("expected pod status reason to be %q, got %q instead.", expectedStatusReason, pod.Status.Reason) + } + if !strings.Contains(pod.Status.Message, expectedStatusMessage) { + return false, fmt.Errorf("expected pod status reason to contain %q, got %q instead.", expectedStatusMessage, pod.Status.Message) + } + return true, nil +} diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index 5ac7b2c1192..e4d598348f9 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -18,7 +18,9 @@ package e2enode import ( "context" + "errors" "fmt" + "os" "path/filepath" "regexp" "strings" @@ -32,6 +34,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" admissionapi "k8s.io/pod-security-admission/api" @@ -43,6 +47,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" ) var ( @@ -55,6 +60,7 @@ var _ = SIGDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:Devic f := framework.NewDefaultFramework("device-plugin-errors") f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged testDevicePlugin(f, kubeletdevicepluginv1beta1.DevicePluginPath) + testDevicePluginNodeReboot(f, kubeletdevicepluginv1beta1.DevicePluginPath) }) // readDaemonSetV1OrDie reads daemonset object from bytes. Panics on error. @@ -313,8 +319,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) - err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, pod1.Name, f.Namespace.Name, 1*time.Minute) - framework.ExpectNoError(err) + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. // note we don't check again the logs of the container: the check is done at startup, the container @@ -330,6 +340,125 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) + // simulate kubelet and container restart, *but not* device plugin re-registration. + // The device assignment should be kept and be stable across the kubelet and container restart, because it's the kubelet which + // performs the device allocation, and both the device plugin is stable. + ginkgo.It("Keeps device plugin assignments across pod and kubelet restarts (no device plugin re-registration)", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + + gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Waiting for container to restart") + ensurePodContainerRestart(ctx, f, pod1.Name, pod1.Name) + + ginkgo.By("Confirming that after a container restart, fake-device assignment is kept") + devIDRestart1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + framework.ExpectEqual(devIDRestart1, devID1) + + ginkgo.By("Restarting Kubelet") + restartKubelet(true) + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after pod restart") + }) + + // simulate device plugin re-registration, *but not* container and kubelet restart. + // After the device plugin has re-registered, the list healthy devices is repopulated based on the devices discovered. + // Once Pod2 is running we determine the device that was allocated it. As long as the device allocation succeeds the + // test should pass. + + ginkgo.It("Keeps device plugin assignments after the device plugin has been re-registered (no kubelet, pod restart)", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Re-Register resources and delete the plugin pod") + gp := int64(0) + deleteOptions := metav1.DeleteOptions{ + GracePeriodSeconds: &gp, + } + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, time.Minute) + waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) + + ginkgo.By("Recreating the plugin pod") + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) + err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod.Name, devicePluginPod.Namespace, 1*time.Minute) + framework.ExpectNoError(err) + + ginkgo.By("Waiting for resource to become available on the local node after re-registration") + gomega.Eventually(ctx, func() bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && + CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + + // crosscheck that after device plugin re-registration the device assignment is preserved and + // stable from the kubelet's perspective. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after device plugin re-registration using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after pod restart") + + ginkgo.By("Creating another pod") + pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, pod2.Name, f.Namespace.Name, 1*time.Minute) + framework.ExpectNoError(err) + + ginkgo.By("Checking that pod got a fake device") + devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name) + + gomega.Expect(devID2).To(gomega.Not(gomega.Equal("")), "pod2 requested a device but started successfully without") + }) + // simulate kubelet restart *and* device plugin re-registration, while the pod and the container stays running. // The device assignment should be kept and be stable across the kubelet/device plugin restart, as both the aforementioned components // orchestrate the device allocation: the actual consumer (container) is stable. @@ -351,6 +480,26 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { ginkgo.By("Wait for node to be ready again") e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after pod restart") + ginkgo.By("Re-Register resources by deleting the plugin pod") gp := int64(0) deleteOptions := metav1.DeleteOptions{ @@ -370,42 +519,222 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) - // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. - // note we don't check again the logs of the container: the check is done at startup, the container - // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check - // is useless. - ginkgo.By("Verifying the device assignment after kubelet and device plugin restart using podresources API") - gomega.Eventually(ctx, func() error { - v1PodResources, err = getV1NodeDevices(ctx) - return err - }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet and device plugin restart") - - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after pod restart") - ginkgo.By("Creating another pod") pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) - ginkgo.By("Checking that pod got a different fake device") + ginkgo.By("Checking that pod got a fake device") devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE) framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name) - gomega.Expect(devID1).To(gomega.Not(gomega.Equal(devID2)), "pod2 requested a device but started successfully without") - ginkgo.By("Verifying the device assignment after kubelet restart and device plugin re-registration using podresources API") // note we don't use eventually: the kubelet is supposed to be running and stable by now, so the call should just succeed v1PodResources, err = getV1NodeDevices(ctx) if err != nil { framework.ExpectNoError(err, "getting pod resources assignment after pod restart") } - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) - framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod1") + err = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) }) } +func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { + ginkgo.Context("DevicePlugin [Serial] [Disruptive]", func() { + var devicePluginPod *v1.Pod + var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse + var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse + var triggerPathFile, triggerPathDir string + var err error + ginkgo.BeforeEach(func(ctx context.Context) { + ginkgo.By("Wait for node to be ready") + gomega.Eventually(ctx, func(ctx context.Context) bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + // Before we run the device plugin test, we need to ensure + // that the cluster is in a clean state and there are no + // pods running on this node. + // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. + // xref: https://issue.k8s.io/115381 + gomega.Eventually(ctx, func(ctx context.Context) error { + v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err) + } + + v1PodResources, err = getV1NodeDevices(ctx) + if err != nil { + return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err) + } + + if len(v1alphaPodResources.PodResources) > 0 { + return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources) + } + + if len(v1PodResources.PodResources) > 0 { + return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) + } + return nil + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) + + ginkgo.By("Setting up the directory and file for controlling registration") + triggerPathDir = filepath.Join(devicePluginDir, "sample") + if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(triggerPathDir, os.ModePerm) + if err != nil { + klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err) + panic(err) + } + klog.InfoS("Directory created successfully") + + triggerPathFile = filepath.Join(triggerPathDir, "registration") + if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) { + _, err = os.Create(triggerPathFile) + if err != nil { + klog.Errorf("File creation %s failed: %v ", triggerPathFile, err) + panic(err) + } + } + } + + ginkgo.By("Scheduling a sample device plugin pod") + data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML) + if err != nil { + framework.Fail(err.Error()) + } + ds := readDaemonSetV1OrDie(data) + + dp := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: SampleDevicePluginName, + }, + Spec: ds.Spec.Template.Spec, + } + + devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) + + go func() { + // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE + // environment variable is specified), device plugin registration needs to be triggerred + // manually. + // This is done by deleting the control file at the following path: + // `/var/lib/kubelet/device-plugins/sample/registration`. + + defer ginkgo.GinkgoRecover() + framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile) + err := os.Remove(triggerPathFile) + framework.ExpectNoError(err) + }() + + ginkgo.By("Waiting for devices to become available on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && CountSampleDeviceCapacity(node) > 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + framework.Logf("Successfully created device plugin pod") + + ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", expectedSampleDevsAmount)) + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && + CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && + CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount + }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("Deleting the device plugin pod") + e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) + + ginkgo.By("Deleting any Pods created by the test") + l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, p := range l.Items { + if p.Namespace != f.Namespace.Name { + continue + } + + framework.Logf("Deleting pod: %s", p.Name) + e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) + } + + err = os.Remove(triggerPathDir) + framework.ExpectNoError(err) + + ginkgo.By("Waiting for devices to become unavailable on the local node") + gomega.Eventually(ctx, func(ctx context.Context) bool { + node, ready := getLocalTestNode(ctx, f) + return ready && CountSampleDeviceCapacity(node) <= 0 + }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) + + ginkgo.By("devices now unavailable on the local node") + }) + + // simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that, + // intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod + // exposing those devices as resource has re-registers itself to Kubelet. The expected behavior is that the application pod fails at + // admission time. + ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) + pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) + + gomega.Expect(devID1).To(gomega.Not(gomega.Equal(""))) + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + ginkgo.By("stopping all the local containers - using CRI") + rs, _, err := getCRIClient() + framework.ExpectNoError(err) + sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{}) + framework.ExpectNoError(err) + for _, sandbox := range sandboxes { + gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil()) + ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id)) + + err := rs.RemovePodSandbox(ctx, sandbox.Id) + framework.ExpectNoError(err) + } + + ginkgo.By("restarting the kubelet") + startKubelet() + + ginkgo.By("Wait for node to be ready again") + e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) + + ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") + gomega.Eventually(ctx, getPod). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(HaveFailedWithAdmissionError(), + "the pod succeeded to start, when it should fail with the admission error") + + // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. + // note we don't check again the logs of the container: the check is done at startup, the container + // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check + // is useless. + ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + + err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + framework.ExpectNoError(err, "inconsistent device assignment after node reboot") + + }) + }) +} + // makeBusyboxPod returns a simple Pod spec with a busybox container // that requests SampleDeviceResourceName and runs the specified command. func makeBusyboxPod(SampleDeviceResourceName, cmd string) *v1.Pod { diff --git a/test/e2e_node/image_list.go b/test/e2e_node/image_list.go index 611590642bc..e6e7da29766 100644 --- a/test/e2e_node/image_list.go +++ b/test/e2e_node/image_list.go @@ -87,11 +87,16 @@ func updateImageAllowList(ctx context.Context) { } else { e2epod.ImagePrePullList.Insert(gpuDevicePluginImage) } - if samplePluginImage, err := getSampleDevicePluginImage(); err != nil { + if samplePluginImage, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginDSYAML); err != nil { klog.Errorln(err) } else { e2epod.ImagePrePullList.Insert(samplePluginImage) } + if samplePluginImageCtrlReg, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginControlRegistrationDSYAML); err != nil { + klog.Errorln(err) + } else { + e2epod.ImagePrePullList.Insert(samplePluginImageCtrlReg) + } } func getNodeProblemDetectorImage() string { @@ -222,19 +227,19 @@ func getGPUDevicePluginImage(ctx context.Context) (string, error) { return ds.Spec.Template.Spec.Containers[0].Image, nil } -func getSampleDevicePluginImage() (string, error) { - data, err := e2etestfiles.Read(SampleDevicePluginDSYAML) +func getContainerImageFromE2ETestDaemonset(dsYamlPath string) (string, error) { + data, err := e2etestfiles.Read(dsYamlPath) if err != nil { - return "", fmt.Errorf("failed to read the sample plugin yaml: %w", err) + return "", fmt.Errorf("failed to read the daemonset yaml: %w", err) } ds, err := e2emanifest.DaemonSetFromData(data) if err != nil { - return "", fmt.Errorf("failed to parse daemon set for sample plugin: %w", err) + return "", fmt.Errorf("failed to parse daemonset yaml: %w", err) } if len(ds.Spec.Template.Spec.Containers) < 1 { - return "", fmt.Errorf("failed to parse the sample plugin image: cannot extract the container from YAML") + return "", fmt.Errorf("failed to parse the container image: cannot extract the container from YAML") } return ds.Spec.Template.Spec.Containers[0].Image, nil }