From 541f2af78dc940e8e27b378b3c3c4b2531f8d4da Mon Sep 17 00:00:00 2001 From: Sergey Kanzhelev Date: Thu, 27 Jun 2024 08:24:33 +0000 Subject: [PATCH] device plugin failure tests --- test/e2e_node/device_plugin_failures_test.go | 356 ++++++++++++++++++ .../testdeviceplugin/device-plugin.go | 237 ++++++++++++ 2 files changed, 593 insertions(+) create mode 100644 test/e2e_node/device_plugin_failures_test.go create mode 100644 test/e2e_node/testdeviceplugin/device-plugin.go diff --git a/test/e2e_node/device_plugin_failures_test.go b/test/e2e_node/device_plugin_failures_test.go new file mode 100644 index 00000000000..8c128653191 --- /dev/null +++ b/test/e2e_node/device_plugin_failures_test.go @@ -0,0 +1,356 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + admissionapi "k8s.io/pod-security-admission/api" + + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e_node/testdeviceplugin" +) + +type ResourceValue struct { + Allocatable int + Capacity int +} + +// Serial because the test restarts Kubelet +var _ = SIGDescribe("Device Plugin Failures:", framework.WithNodeConformance(), func() { + f := framework.NewDefaultFramework("device-plugin-failures") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + var getNodeResourceValues = func(ctx context.Context, resourceName string) ResourceValue { + ginkgo.GinkgoHelper() + node := getLocalNode(ctx, f) + + // -1 represents that the resource is not found + result := ResourceValue{ + Allocatable: -1, + Capacity: -1, + } + + for key, val := range node.Status.Capacity { + resource := string(key) + if resource == resourceName { + result.Capacity = int(val.Value()) + break + } + } + + for key, val := range node.Status.Allocatable { + resource := string(key) + if resource == resourceName { + result.Allocatable = int(val.Value()) + break + } + } + + return result + } + + var createPod = func(resourceName string, quantity int) *v1.Pod { + ginkgo.GinkgoHelper() + rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(int64(quantity), resource.DecimalSI)} + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-failures-test-" + string(uuid.NewUUID())}, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyAlways, + Containers: []v1.Container{{ + Image: busyboxImage, + Name: "container-1", + Command: []string{"sh", "-c", fmt.Sprintf("env && sleep %s", sleepIntervalForever)}, + Resources: v1.ResourceRequirements{ + Limits: rl, + Requests: rl, + }, + }}, + }, + } + return pod + } + + nodeStatusUpdateTimeout := 1 * time.Minute + devicePluginUpdateTimeout := 1 * time.Minute + devicePluginGracefulTimeout := 5 * time.Minute // see endpointStopGracePeriod in pkg/kubelet/cm/devicemanager/types.go + + ginkgo.It("when GetDevicePluginOptions fails, device plugin will not be used", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + + expectedErr := fmt.Errorf("GetDevicePluginOptions failed") + + plugin := testdeviceplugin.NewDevicePlugin(func(name string) error { + if name == "GetDevicePluginOptions" { + return expectedErr + } + return nil + }) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.MatchError(gomega.ContainSubstring("failed to get device plugin options"))) + gomega.Expect(err).To(gomega.MatchError(gomega.ContainSubstring(expectedErr.Error()))) + + gomega.Expect(plugin.WasCalled("ListAndWatch")).To(gomega.BeFalseBecause("plugin should not be used if GetDevicePluginOptions fails")) + gomega.Expect(plugin.WasCalled("GetDevicePluginOptions")).To(gomega.BeTrueBecause("get device plugin options should be called exactly once")) + gomega.Expect(plugin.Calls()).To(gomega.HaveLen(1)) + + // kubelet will not even register the resource + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1})) + }) + + ginkgo.It("will set allocatable to zero when a single device became unhealthy and then back to 1 if it got healthy again", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}} + plugin := testdeviceplugin.NewDevicePlugin(nil) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + // at first the device is healthy + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1})) + + // now make the device unhealthy + devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy + plugin.UpdateDevices(devices) + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1})) + + // now make the device healthy again + devices[0].Health = kubeletdevicepluginv1beta1.Healthy + plugin.UpdateDevices(devices) + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1})) + }) + + ginkgo.It("will set allocatable to zero when a single device became unhealthy, but capacity will stay at 1", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}} + plugin := testdeviceplugin.NewDevicePlugin(nil) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + ginkgo.By("initial state: capacity and allocatable are set") + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1})) + + // schedule a pod that requests the device + client := e2epod.NewPodClient(f) + pod := client.Create(ctx, createPod(resourceName, 1)) + + // wait for the pod to be running + gomega.Expect(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)).To(gomega.Succeed()) + + ginkgo.By("once pod is running, it does not affect allocatable value") + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1})) + + // now make the device unhealthy + devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy + plugin.UpdateDevices(devices) + + ginkgo.By("even when device became unhealthy. pod is still running and keeping the capacity") + // we keep the allocatable at the same value even though device is not healthy any longer + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1})) + + // pod is not affected by the device becoming unhealthy + + gomega.Consistently(func() v1.PodPhase { + pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{}) + return pod.Status.Phase + }, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(v1.PodRunning)) + + // deleting the pod + err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + gomega.Expect(err).To(gomega.Succeed()) + + // wait for the pod to be deleted + gomega.Eventually(func() error { + _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{}) + return err + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.MatchError((gomega.ContainSubstring("not found")))) + + ginkgo.By("when pod is deleted, nothing changes") + gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1})) + }) + + ginkgo.It("will lower allocatable to a number of unhealthy devices and then back if they became healthy again", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + + devices := []kubeletdevicepluginv1beta1.Device{ + {ID: "0", Health: kubeletdevicepluginv1beta1.Healthy}, + {ID: "1", Health: kubeletdevicepluginv1beta1.Healthy}, + {ID: "2", Health: kubeletdevicepluginv1beta1.Healthy}, + {ID: "3", Health: kubeletdevicepluginv1beta1.Healthy}, + } + plugin := testdeviceplugin.NewDevicePlugin(nil) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + // at first all the devices are healthy + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4})) + + // now make one device unhealthy + devices[3].Health = kubeletdevicepluginv1beta1.Unhealthy + plugin.UpdateDevices(devices) + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 3, Capacity: 4})) + + // now make the device healthy again + devices[3].Health = kubeletdevicepluginv1beta1.Healthy + plugin.UpdateDevices(devices) + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4})) + + // now make two devices unhealthy + devices[1].Health = kubeletdevicepluginv1beta1.Unhealthy + devices[3].Health = kubeletdevicepluginv1beta1.Unhealthy + plugin.UpdateDevices(devices) + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 4})) + + // now make the device healthy again + devices[3].Health = kubeletdevicepluginv1beta1.Healthy + plugin.UpdateDevices(devices) + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 3, Capacity: 4})) + + // now make the device healthy again + devices[1].Health = kubeletdevicepluginv1beta1.Healthy + plugin.UpdateDevices(devices) + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4})) + }) + + ginkgo.It("when ListAndWatch fails immediately, node allocatable will be set to zero and kubelet will not retry to list resources", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}} + + // Initially, there are no allocatable of this resource + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1})) + + plugin := testdeviceplugin.NewDevicePlugin(func(name string) error { + if name == "ListAndWatch" { + return fmt.Errorf("ListAndWatch failed") + } + return nil + }) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + // kubelet registers the resource, but will not have any allocatable + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0})) + + // kubelet will never retry ListAndWatch (this will sleep for a long time) + gomega.Consistently(plugin.Calls, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.HaveLen(2)) + + // however kubelet will not delete the resource + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0})) + }) + + ginkgo.It("when ListAndWatch fails after provisioning devices, node allocatable will be set to zero and kubelet will not retry to list resources", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + devices := []kubeletdevicepluginv1beta1.Device{ + {ID: "0", Health: kubeletdevicepluginv1beta1.Healthy}, + {ID: "1", Health: kubeletdevicepluginv1beta1.Healthy}, + } + + failing := false + plugin := testdeviceplugin.NewDevicePlugin(func(name string) error { + if name == "ListAndWatch" { + if failing { + return fmt.Errorf("ListAndWatch failed") + } + } + return nil + }) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + // at first the device is healthy + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 2})) + + // let's make ListAndWatch fail + failing = true + + // kubelet will mark all devices as unhealthy + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2})) + + // kubelet will never retry ListAndWatch (this will sleep for a long time) + gomega.Consistently(plugin.Calls, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.HaveLen(2)) + + // however kubelet will not delete the resource and will keep the capacity + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2})) + + // after the graceful period devices capacity will reset to zero + gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0})) + }) + + ginkgo.It("when device plugin is stopped after provisioning devices, node allocatable will be set to zero", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + devices := []kubeletdevicepluginv1beta1.Device{ + {ID: "0", Health: kubeletdevicepluginv1beta1.Healthy}, + {ID: "1", Health: kubeletdevicepluginv1beta1.Healthy}, + } + + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1})) + + plugin := testdeviceplugin.NewDevicePlugin(nil) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + // at first the device is healthy + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 2})) + + // let's unload the plugin + plugin.Stop() + + // kubelet will mark all devices as unhealthy + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2})) + + // after the graceful period devices capacity will reset to zero + gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0})) + }) +}) diff --git a/test/e2e_node/testdeviceplugin/device-plugin.go b/test/e2e_node/testdeviceplugin/device-plugin.go new file mode 100644 index 00000000000..bece5ec5114 --- /dev/null +++ b/test/e2e_node/testdeviceplugin/device-plugin.go @@ -0,0 +1,237 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testdeviceplugin + +import ( + "context" + "fmt" + "net" + "os" + "sync" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +type DevicePlugin struct { + server *grpc.Server + uniqueName string + + devices []kubeletdevicepluginv1beta1.Device + devicesSync sync.Mutex + + devicesUpdateCh chan struct{} + + calls []string + callsSync sync.Mutex + + errorInjector func(string) error +} + +func NewDevicePlugin(errorInjector func(string) error) *DevicePlugin { + return &DevicePlugin{ + calls: []string{}, + devicesUpdateCh: make(chan struct{}), + errorInjector: errorInjector, + } +} + +func (dp *DevicePlugin) GetDevicePluginOptions(context.Context, *kubeletdevicepluginv1beta1.Empty) (*kubeletdevicepluginv1beta1.DevicePluginOptions, error) { + // lock the mutex and add to a list of calls + dp.callsSync.Lock() + dp.calls = append(dp.calls, "GetDevicePluginOptions") + dp.callsSync.Unlock() + + if dp.errorInjector != nil { + return &kubeletdevicepluginv1beta1.DevicePluginOptions{}, dp.errorInjector("GetDevicePluginOptions") + } + return &kubeletdevicepluginv1beta1.DevicePluginOptions{}, nil +} + +func (dp *DevicePlugin) sendDevices(stream kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error { + resp := new(kubeletdevicepluginv1beta1.ListAndWatchResponse) + + dp.devicesSync.Lock() + for _, d := range dp.devices { + resp.Devices = append(resp.Devices, &d) + } + dp.devicesSync.Unlock() + + return stream.Send(resp) +} + +func (dp *DevicePlugin) ListAndWatch(empty *kubeletdevicepluginv1beta1.Empty, stream kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error { + dp.callsSync.Lock() + dp.calls = append(dp.calls, "ListAndWatch") + dp.callsSync.Unlock() + + if dp.errorInjector != nil { + if err := dp.errorInjector("ListAndWatch"); err != nil { + return err + } + } + + if err := dp.sendDevices(stream); err != nil { + return err + } + + // when the devices are updated, send the new devices to the kubelet + // also start a timer and every second call into the dp.errorInjector + // to simulate a device plugin failure + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-dp.devicesUpdateCh: + if err := dp.sendDevices(stream); err != nil { + return err + } + case <-ticker.C: + if dp.errorInjector != nil { + if err := dp.errorInjector("ListAndWatch"); err != nil { + return err + } + } + } + } +} + +func (dp *DevicePlugin) Allocate(ctx context.Context, request *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) { + result := new(kubeletdevicepluginv1beta1.AllocateResponse) + + dp.callsSync.Lock() + dp.calls = append(dp.calls, "Allocate") + dp.callsSync.Unlock() + + for _, r := range request.ContainerRequests { + response := &kubeletdevicepluginv1beta1.ContainerAllocateResponse{} + for _, id := range r.DevicesIDs { + fpath, err := os.CreateTemp("/tmp", fmt.Sprintf("%s-%s", dp.uniqueName, id)) + gomega.Expect(err).To(gomega.Succeed()) + + response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ + ContainerPath: fpath.Name(), + HostPath: fpath.Name(), + }) + } + result.ContainerResponses = append(result.ContainerResponses, response) + } + + return result, nil +} + +func (dp *DevicePlugin) PreStartContainer(ctx context.Context, request *kubeletdevicepluginv1beta1.PreStartContainerRequest) (*kubeletdevicepluginv1beta1.PreStartContainerResponse, error) { + return nil, nil +} + +func (dp *DevicePlugin) GetPreferredAllocation(ctx context.Context, request *kubeletdevicepluginv1beta1.PreferredAllocationRequest) (*kubeletdevicepluginv1beta1.PreferredAllocationResponse, error) { + return nil, nil +} + +func (dp *DevicePlugin) RegisterDevicePlugin(ctx context.Context, uniqueName, resourceName string, devices []kubeletdevicepluginv1beta1.Device) error { + ginkgo.GinkgoHelper() + + dp.devicesSync.Lock() + dp.devices = devices + dp.devicesSync.Unlock() + + devicePluginEndpoint := fmt.Sprintf("%s-%s.sock", "test-device-plugin", uniqueName) + dp.uniqueName = uniqueName + + // Implement the logic to register the device plugin with the kubelet + // Create a new gRPC server + dp.server = grpc.NewServer() + // Register the device plugin with the server + kubeletdevicepluginv1beta1.RegisterDevicePluginServer(dp.server, dp) + // Create a listener on a specific port + lis, err := net.Listen("unix", kubeletdevicepluginv1beta1.DevicePluginPath+devicePluginEndpoint) + if err != nil { + return err + } + // Start the gRPC server + go func() { + err := dp.server.Serve(lis) + gomega.Expect(err).To(gomega.Succeed()) + }() + + // Create a connection to the kubelet + conn, err := grpc.NewClient("unix://"+kubeletdevicepluginv1beta1.KubeletSocket, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + defer func() { + err := conn.Close() + gomega.Expect(err).To(gomega.Succeed()) + }() + + // Create a client for the kubelet + client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn) + + // Register the device plugin with the kubelet + _, err = client.Register(ctx, &kubeletdevicepluginv1beta1.RegisterRequest{ + Version: kubeletdevicepluginv1beta1.Version, + Endpoint: devicePluginEndpoint, + ResourceName: resourceName, + }) + if err != nil { + return err + } + return nil +} + +func (dp *DevicePlugin) Stop() { + if dp.server != nil { + dp.server.Stop() + dp.server = nil + } +} + +func (dp *DevicePlugin) WasCalled(method string) bool { + // lock mutex and then search if the method was called + dp.callsSync.Lock() + defer dp.callsSync.Unlock() + for _, call := range dp.calls { + if call == method { + return true + } + } + return false +} + +func (dp *DevicePlugin) Calls() []string { + // lock the mutex and return the calls + dp.callsSync.Lock() + defer dp.callsSync.Unlock() + // return a copy of the calls + calls := make([]string, len(dp.calls)) + copy(calls, dp.calls) + return calls +} + +func (dp *DevicePlugin) UpdateDevices(devices []kubeletdevicepluginv1beta1.Device) { + // lock the mutex and update the devices + dp.devicesSync.Lock() + defer dp.devicesSync.Unlock() + dp.devices = devices + dp.devicesUpdateCh <- struct{}{} +}