mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
Merge pull request #125753 from SergeyKanzhelev/devicePluginFailuresTests
device plugin failure tests
This commit is contained in:
commit
157f4b94d8
356
test/e2e_node/device_plugin_failures_test.go
Normal file
356
test/e2e_node/device_plugin_failures_test.go
Normal file
@ -0,0 +1,356 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2enode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e_node/testdeviceplugin"
|
||||
)
|
||||
|
||||
type ResourceValue struct {
|
||||
Allocatable int
|
||||
Capacity int
|
||||
}
|
||||
|
||||
// Serial because the test restarts Kubelet
|
||||
var _ = SIGDescribe("Device Plugin Failures:", framework.WithNodeConformance(), func() {
|
||||
f := framework.NewDefaultFramework("device-plugin-failures")
|
||||
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
|
||||
|
||||
var getNodeResourceValues = func(ctx context.Context, resourceName string) ResourceValue {
|
||||
ginkgo.GinkgoHelper()
|
||||
node := getLocalNode(ctx, f)
|
||||
|
||||
// -1 represents that the resource is not found
|
||||
result := ResourceValue{
|
||||
Allocatable: -1,
|
||||
Capacity: -1,
|
||||
}
|
||||
|
||||
for key, val := range node.Status.Capacity {
|
||||
resource := string(key)
|
||||
if resource == resourceName {
|
||||
result.Capacity = int(val.Value())
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for key, val := range node.Status.Allocatable {
|
||||
resource := string(key)
|
||||
if resource == resourceName {
|
||||
result.Allocatable = int(val.Value())
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
var createPod = func(resourceName string, quantity int) *v1.Pod {
|
||||
ginkgo.GinkgoHelper()
|
||||
rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(int64(quantity), resource.DecimalSI)}
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-failures-test-" + string(uuid.NewUUID())},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyAlways,
|
||||
Containers: []v1.Container{{
|
||||
Image: busyboxImage,
|
||||
Name: "container-1",
|
||||
Command: []string{"sh", "-c", fmt.Sprintf("env && sleep %s", sleepIntervalForever)},
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: rl,
|
||||
Requests: rl,
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
nodeStatusUpdateTimeout := 1 * time.Minute
|
||||
devicePluginUpdateTimeout := 1 * time.Minute
|
||||
devicePluginGracefulTimeout := 5 * time.Minute // see endpointStopGracePeriod in pkg/kubelet/cm/devicemanager/types.go
|
||||
|
||||
ginkgo.It("when GetDevicePluginOptions fails, device plugin will not be used", func(ctx context.Context) {
|
||||
// randomizing so tests can run in parallel
|
||||
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
|
||||
|
||||
expectedErr := fmt.Errorf("GetDevicePluginOptions failed")
|
||||
|
||||
plugin := testdeviceplugin.NewDevicePlugin(func(name string) error {
|
||||
if name == "GetDevicePluginOptions" {
|
||||
return expectedErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}})
|
||||
defer plugin.Stop() // should stop even if registration failed
|
||||
gomega.Expect(err).To(gomega.MatchError(gomega.ContainSubstring("failed to get device plugin options")))
|
||||
gomega.Expect(err).To(gomega.MatchError(gomega.ContainSubstring(expectedErr.Error())))
|
||||
|
||||
gomega.Expect(plugin.WasCalled("ListAndWatch")).To(gomega.BeFalseBecause("plugin should not be used if GetDevicePluginOptions fails"))
|
||||
gomega.Expect(plugin.WasCalled("GetDevicePluginOptions")).To(gomega.BeTrueBecause("get device plugin options should be called exactly once"))
|
||||
gomega.Expect(plugin.Calls()).To(gomega.HaveLen(1))
|
||||
|
||||
// kubelet will not even register the resource
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1}))
|
||||
})
|
||||
|
||||
ginkgo.It("will set allocatable to zero when a single device became unhealthy and then back to 1 if it got healthy again", func(ctx context.Context) {
|
||||
// randomizing so tests can run in parallel
|
||||
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
|
||||
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
|
||||
plugin := testdeviceplugin.NewDevicePlugin(nil)
|
||||
|
||||
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
|
||||
defer plugin.Stop() // should stop even if registration failed
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
// at first the device is healthy
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
|
||||
|
||||
// now make the device unhealthy
|
||||
devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1}))
|
||||
|
||||
// now make the device healthy again
|
||||
devices[0].Health = kubeletdevicepluginv1beta1.Healthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
|
||||
})
|
||||
|
||||
ginkgo.It("will set allocatable to zero when a single device became unhealthy, but capacity will stay at 1", func(ctx context.Context) {
|
||||
// randomizing so tests can run in parallel
|
||||
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
|
||||
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
|
||||
plugin := testdeviceplugin.NewDevicePlugin(nil)
|
||||
|
||||
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
|
||||
defer plugin.Stop() // should stop even if registration failed
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
ginkgo.By("initial state: capacity and allocatable are set")
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
|
||||
|
||||
// schedule a pod that requests the device
|
||||
client := e2epod.NewPodClient(f)
|
||||
pod := client.Create(ctx, createPod(resourceName, 1))
|
||||
|
||||
// wait for the pod to be running
|
||||
gomega.Expect(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)).To(gomega.Succeed())
|
||||
|
||||
ginkgo.By("once pod is running, it does not affect allocatable value")
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
|
||||
|
||||
// now make the device unhealthy
|
||||
devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
ginkgo.By("even when device became unhealthy. pod is still running and keeping the capacity")
|
||||
// we keep the allocatable at the same value even though device is not healthy any longer
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1}))
|
||||
|
||||
// pod is not affected by the device becoming unhealthy
|
||||
|
||||
gomega.Consistently(func() v1.PodPhase {
|
||||
pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
|
||||
return pod.Status.Phase
|
||||
}, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(v1.PodRunning))
|
||||
|
||||
// deleting the pod
|
||||
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
// wait for the pod to be deleted
|
||||
gomega.Eventually(func() error {
|
||||
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
|
||||
return err
|
||||
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.MatchError((gomega.ContainSubstring("not found"))))
|
||||
|
||||
ginkgo.By("when pod is deleted, nothing changes")
|
||||
gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1}))
|
||||
})
|
||||
|
||||
ginkgo.It("will lower allocatable to a number of unhealthy devices and then back if they became healthy again", func(ctx context.Context) {
|
||||
// randomizing so tests can run in parallel
|
||||
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
|
||||
|
||||
devices := []kubeletdevicepluginv1beta1.Device{
|
||||
{ID: "0", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
{ID: "1", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
{ID: "2", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
{ID: "3", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
}
|
||||
plugin := testdeviceplugin.NewDevicePlugin(nil)
|
||||
|
||||
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
|
||||
defer plugin.Stop() // should stop even if registration failed
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
// at first all the devices are healthy
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4}))
|
||||
|
||||
// now make one device unhealthy
|
||||
devices[3].Health = kubeletdevicepluginv1beta1.Unhealthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 3, Capacity: 4}))
|
||||
|
||||
// now make the device healthy again
|
||||
devices[3].Health = kubeletdevicepluginv1beta1.Healthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4}))
|
||||
|
||||
// now make two devices unhealthy
|
||||
devices[1].Health = kubeletdevicepluginv1beta1.Unhealthy
|
||||
devices[3].Health = kubeletdevicepluginv1beta1.Unhealthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 4}))
|
||||
|
||||
// now make the device healthy again
|
||||
devices[3].Health = kubeletdevicepluginv1beta1.Healthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 3, Capacity: 4}))
|
||||
|
||||
// now make the device healthy again
|
||||
devices[1].Health = kubeletdevicepluginv1beta1.Healthy
|
||||
plugin.UpdateDevices(devices)
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4}))
|
||||
})
|
||||
|
||||
ginkgo.It("when ListAndWatch fails immediately, node allocatable will be set to zero and kubelet will not retry to list resources", func(ctx context.Context) {
|
||||
// randomizing so tests can run in parallel
|
||||
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
|
||||
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
|
||||
|
||||
// Initially, there are no allocatable of this resource
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1}))
|
||||
|
||||
plugin := testdeviceplugin.NewDevicePlugin(func(name string) error {
|
||||
if name == "ListAndWatch" {
|
||||
return fmt.Errorf("ListAndWatch failed")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
|
||||
defer plugin.Stop() // should stop even if registration failed
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
// kubelet registers the resource, but will not have any allocatable
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
|
||||
|
||||
// kubelet will never retry ListAndWatch (this will sleep for a long time)
|
||||
gomega.Consistently(plugin.Calls, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.HaveLen(2))
|
||||
|
||||
// however kubelet will not delete the resource
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
|
||||
})
|
||||
|
||||
ginkgo.It("when ListAndWatch fails after provisioning devices, node allocatable will be set to zero and kubelet will not retry to list resources", func(ctx context.Context) {
|
||||
// randomizing so tests can run in parallel
|
||||
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
|
||||
devices := []kubeletdevicepluginv1beta1.Device{
|
||||
{ID: "0", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
{ID: "1", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
}
|
||||
|
||||
failing := false
|
||||
plugin := testdeviceplugin.NewDevicePlugin(func(name string) error {
|
||||
if name == "ListAndWatch" {
|
||||
if failing {
|
||||
return fmt.Errorf("ListAndWatch failed")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
|
||||
defer plugin.Stop() // should stop even if registration failed
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
// at first the device is healthy
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 2}))
|
||||
|
||||
// let's make ListAndWatch fail
|
||||
failing = true
|
||||
|
||||
// kubelet will mark all devices as unhealthy
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2}))
|
||||
|
||||
// kubelet will never retry ListAndWatch (this will sleep for a long time)
|
||||
gomega.Consistently(plugin.Calls, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.HaveLen(2))
|
||||
|
||||
// however kubelet will not delete the resource and will keep the capacity
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2}))
|
||||
|
||||
// after the graceful period devices capacity will reset to zero
|
||||
gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
|
||||
})
|
||||
|
||||
ginkgo.It("when device plugin is stopped after provisioning devices, node allocatable will be set to zero", func(ctx context.Context) {
|
||||
// randomizing so tests can run in parallel
|
||||
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
|
||||
devices := []kubeletdevicepluginv1beta1.Device{
|
||||
{ID: "0", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
{ID: "1", Health: kubeletdevicepluginv1beta1.Healthy},
|
||||
}
|
||||
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1}))
|
||||
|
||||
plugin := testdeviceplugin.NewDevicePlugin(nil)
|
||||
|
||||
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
|
||||
defer plugin.Stop() // should stop even if registration failed
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
// at first the device is healthy
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 2}))
|
||||
|
||||
// let's unload the plugin
|
||||
plugin.Stop()
|
||||
|
||||
// kubelet will mark all devices as unhealthy
|
||||
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2}))
|
||||
|
||||
// after the graceful period devices capacity will reset to zero
|
||||
gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
|
||||
})
|
||||
})
|
237
test/e2e_node/testdeviceplugin/device-plugin.go
Normal file
237
test/e2e_node/testdeviceplugin/device-plugin.go
Normal file
@ -0,0 +1,237 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testdeviceplugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
)
|
||||
|
||||
type DevicePlugin struct {
|
||||
server *grpc.Server
|
||||
uniqueName string
|
||||
|
||||
devices []kubeletdevicepluginv1beta1.Device
|
||||
devicesSync sync.Mutex
|
||||
|
||||
devicesUpdateCh chan struct{}
|
||||
|
||||
calls []string
|
||||
callsSync sync.Mutex
|
||||
|
||||
errorInjector func(string) error
|
||||
}
|
||||
|
||||
func NewDevicePlugin(errorInjector func(string) error) *DevicePlugin {
|
||||
return &DevicePlugin{
|
||||
calls: []string{},
|
||||
devicesUpdateCh: make(chan struct{}),
|
||||
errorInjector: errorInjector,
|
||||
}
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) GetDevicePluginOptions(context.Context, *kubeletdevicepluginv1beta1.Empty) (*kubeletdevicepluginv1beta1.DevicePluginOptions, error) {
|
||||
// lock the mutex and add to a list of calls
|
||||
dp.callsSync.Lock()
|
||||
dp.calls = append(dp.calls, "GetDevicePluginOptions")
|
||||
dp.callsSync.Unlock()
|
||||
|
||||
if dp.errorInjector != nil {
|
||||
return &kubeletdevicepluginv1beta1.DevicePluginOptions{}, dp.errorInjector("GetDevicePluginOptions")
|
||||
}
|
||||
return &kubeletdevicepluginv1beta1.DevicePluginOptions{}, nil
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) sendDevices(stream kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
|
||||
resp := new(kubeletdevicepluginv1beta1.ListAndWatchResponse)
|
||||
|
||||
dp.devicesSync.Lock()
|
||||
for _, d := range dp.devices {
|
||||
resp.Devices = append(resp.Devices, &d)
|
||||
}
|
||||
dp.devicesSync.Unlock()
|
||||
|
||||
return stream.Send(resp)
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) ListAndWatch(empty *kubeletdevicepluginv1beta1.Empty, stream kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
|
||||
dp.callsSync.Lock()
|
||||
dp.calls = append(dp.calls, "ListAndWatch")
|
||||
dp.callsSync.Unlock()
|
||||
|
||||
if dp.errorInjector != nil {
|
||||
if err := dp.errorInjector("ListAndWatch"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := dp.sendDevices(stream); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// when the devices are updated, send the new devices to the kubelet
|
||||
// also start a timer and every second call into the dp.errorInjector
|
||||
// to simulate a device plugin failure
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-dp.devicesUpdateCh:
|
||||
if err := dp.sendDevices(stream); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ticker.C:
|
||||
if dp.errorInjector != nil {
|
||||
if err := dp.errorInjector("ListAndWatch"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) Allocate(ctx context.Context, request *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
|
||||
result := new(kubeletdevicepluginv1beta1.AllocateResponse)
|
||||
|
||||
dp.callsSync.Lock()
|
||||
dp.calls = append(dp.calls, "Allocate")
|
||||
dp.callsSync.Unlock()
|
||||
|
||||
for _, r := range request.ContainerRequests {
|
||||
response := &kubeletdevicepluginv1beta1.ContainerAllocateResponse{}
|
||||
for _, id := range r.DevicesIDs {
|
||||
fpath, err := os.CreateTemp("/tmp", fmt.Sprintf("%s-%s", dp.uniqueName, id))
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
|
||||
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
|
||||
ContainerPath: fpath.Name(),
|
||||
HostPath: fpath.Name(),
|
||||
})
|
||||
}
|
||||
result.ContainerResponses = append(result.ContainerResponses, response)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) PreStartContainer(ctx context.Context, request *kubeletdevicepluginv1beta1.PreStartContainerRequest) (*kubeletdevicepluginv1beta1.PreStartContainerResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) GetPreferredAllocation(ctx context.Context, request *kubeletdevicepluginv1beta1.PreferredAllocationRequest) (*kubeletdevicepluginv1beta1.PreferredAllocationResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) RegisterDevicePlugin(ctx context.Context, uniqueName, resourceName string, devices []kubeletdevicepluginv1beta1.Device) error {
|
||||
ginkgo.GinkgoHelper()
|
||||
|
||||
dp.devicesSync.Lock()
|
||||
dp.devices = devices
|
||||
dp.devicesSync.Unlock()
|
||||
|
||||
devicePluginEndpoint := fmt.Sprintf("%s-%s.sock", "test-device-plugin", uniqueName)
|
||||
dp.uniqueName = uniqueName
|
||||
|
||||
// Implement the logic to register the device plugin with the kubelet
|
||||
// Create a new gRPC server
|
||||
dp.server = grpc.NewServer()
|
||||
// Register the device plugin with the server
|
||||
kubeletdevicepluginv1beta1.RegisterDevicePluginServer(dp.server, dp)
|
||||
// Create a listener on a specific port
|
||||
lis, err := net.Listen("unix", kubeletdevicepluginv1beta1.DevicePluginPath+devicePluginEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Start the gRPC server
|
||||
go func() {
|
||||
err := dp.server.Serve(lis)
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
}()
|
||||
|
||||
// Create a connection to the kubelet
|
||||
conn, err := grpc.NewClient("unix://"+kubeletdevicepluginv1beta1.KubeletSocket,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err := conn.Close()
|
||||
gomega.Expect(err).To(gomega.Succeed())
|
||||
}()
|
||||
|
||||
// Create a client for the kubelet
|
||||
client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
|
||||
|
||||
// Register the device plugin with the kubelet
|
||||
_, err = client.Register(ctx, &kubeletdevicepluginv1beta1.RegisterRequest{
|
||||
Version: kubeletdevicepluginv1beta1.Version,
|
||||
Endpoint: devicePluginEndpoint,
|
||||
ResourceName: resourceName,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) Stop() {
|
||||
if dp.server != nil {
|
||||
dp.server.Stop()
|
||||
dp.server = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) WasCalled(method string) bool {
|
||||
// lock mutex and then search if the method was called
|
||||
dp.callsSync.Lock()
|
||||
defer dp.callsSync.Unlock()
|
||||
for _, call := range dp.calls {
|
||||
if call == method {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) Calls() []string {
|
||||
// lock the mutex and return the calls
|
||||
dp.callsSync.Lock()
|
||||
defer dp.callsSync.Unlock()
|
||||
// return a copy of the calls
|
||||
calls := make([]string, len(dp.calls))
|
||||
copy(calls, dp.calls)
|
||||
return calls
|
||||
}
|
||||
|
||||
func (dp *DevicePlugin) UpdateDevices(devices []kubeletdevicepluginv1beta1.Device) {
|
||||
// lock the mutex and update the devices
|
||||
dp.devicesSync.Lock()
|
||||
defer dp.devicesSync.Unlock()
|
||||
dp.devices = devices
|
||||
dp.devicesUpdateCh <- struct{}{}
|
||||
}
|
Loading…
Reference in New Issue
Block a user