From 671c4eb2b79941983d89ef5b07b25b0d546504ad Mon Sep 17 00:00:00 2001 From: Penghao Cen Date: Thu, 11 Jan 2018 14:41:45 +0800 Subject: [PATCH] Add e2e test logic for device plugin --- .../cm/deviceplugin/device_plugin_stub.go | 28 ++- test/e2e_node/device_plugin.go | 162 +++++++++++++++++- test/e2e_node/util.go | 1 + 3 files changed, 188 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go index 01f08c15987..a04389cc192 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go @@ -38,6 +38,18 @@ type Stub struct { update chan []*pluginapi.Device server *grpc.Server + + // allocFunc is used for handling allocation request + allocFunc stubAllocFunc +} + +// stubAllocFunc is the function called when receive an allocation request from Kubelet +type stubAllocFunc func(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) + +func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) { + var response pluginapi.AllocateResponse + + return &response, nil } // NewDevicePluginStub returns an initialized DevicePlugin Stub. @@ -48,9 +60,16 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string) *Stub { stop: make(chan interface{}), update: make(chan []*pluginapi.Device), + + allocFunc: defaultAllocFunc, } } +// SetAllocFunc sets allocFunc of the device plugin +func (m *Stub) SetAllocFunc(f stubAllocFunc) { + m.allocFunc = f +} + // Start starts the gRPC server of the device plugin func (m *Stub) Start() error { err := m.cleanup() @@ -145,8 +164,13 @@ func (m *Stub) Update(devs []*pluginapi.Device) { func (m *Stub) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { log.Printf("Allocate, %+v", r) - var response pluginapi.AllocateResponse - return &response, nil + devs := make(map[string]pluginapi.Device) + + for _, dev := range m.devs { + devs[dev.ID] = *dev + } + + return m.allocFunc(r, devs) } func (m *Stub) cleanup() error { diff --git a/test/e2e_node/device_plugin.go b/test/e2e_node/device_plugin.go index 9748d31d68a..826d3b66989 100644 --- a/test/e2e_node/device_plugin.go +++ b/test/e2e_node/device_plugin.go @@ -40,7 +40,130 @@ import ( . "github.com/onsi/gomega" ) -// makeBusyboxPod returns a simple Pod spec with a pause container +const ( + // fake resource name + resourceName = "fake.com/resource" +) + +// Serial because the test restarts Kubelet +var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [Disruptive]", func() { + f := framework.NewDefaultFramework("device-plugin-errors") + + Context("DevicePlugin", func() { + By("Enabling support for Device Plugin") + tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.FeatureGates[string(features.DevicePlugins)] = true + }) + + It("Verifies the Kubelet device plugin functionality.", func() { + + By("Wait for node is ready") + framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) + + By("Start stub device plugin") + // fake devices for e2e test + devs := []*pluginapi.Device{ + {ID: "Dev-1", Health: pluginapi.Healthy}, + {ID: "Dev-2", Health: pluginapi.Healthy}, + } + + socketPath := pluginapi.DevicePluginPath + "dp." + fmt.Sprintf("%d", time.Now().Unix()) + + dp1 := dp.NewDevicePluginStub(devs, socketPath) + dp1.SetAllocFunc(stubAllocFunc) + err := dp1.Start() + framework.ExpectNoError(err) + + By("Register resources") + err = dp1.Register(pluginapi.KubeletSocket, resourceName) + framework.ExpectNoError(err) + + By("Waiting for the resource exported by the stub device plugin to become available on the local node") + devsLen := int64(len(devs)) + Eventually(func() int64 { + node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + return numberOfDevices(node, resourceName) + }, 30*time.Second, framework.Poll).Should(Equal(devsLen)) + + By("Creating one pod on node with at least one fake-device") + podRECMD := "devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs" + pod1 := f.PodClient().CreateSync(makeBusyboxPod(resourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + count1, devId1 := parseLogFromNRuns(f, pod1.Name, pod1.Name, 0, deviceIDRE) + Expect(devId1).To(Not(Equal(""))) + + pod1, err = f.PodClient().Get(pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + By("Restarting Kubelet and waiting for the current running pod to restart") + restartKubelet() + + By("Confirming that after a kubelet and pod restart, fake-device assignement is kept") + count1, devIdRestart1 := parseLogFromNRuns(f, pod1.Name, pod1.Name, count1+1, deviceIDRE) + Expect(devIdRestart1).To(Equal(devId1)) + + By("Wait for node is ready") + framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) + + By("Re-Register resources") + dp1 = dp.NewDevicePluginStub(devs, socketPath) + dp1.SetAllocFunc(stubAllocFunc) + err = dp1.Start() + framework.ExpectNoError(err) + + err = dp1.Register(pluginapi.KubeletSocket, resourceName) + framework.ExpectNoError(err) + + By("Waiting for resource to become available on the local node after re-registration") + Eventually(func() int64 { + node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + return numberOfDevices(node, resourceName) + }, 30*time.Second, framework.Poll).Should(Equal(devsLen)) + + By("Creating another pod") + pod2 := f.PodClient().CreateSync(makeBusyboxPod(resourceName, podRECMD)) + + By("Checking that pods got a different GPU") + count2, devId2 := parseLogFromNRuns(f, pod2.Name, pod2.Name, 1, deviceIDRE) + + Expect(devId1).To(Not(Equal(devId2))) + + By("Deleting device plugin.") + err = dp1.Stop() + framework.ExpectNoError(err) + + By("Waiting for stub device plugin to become unavailable on the local node") + Eventually(func() bool { + node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + return numberOfDevices(node, resourceName) <= 0 + }, 10*time.Minute, framework.Poll).Should(BeTrue()) + + By("Checking that scheduled pods can continue to run even after we delete device plugin.") + count1, devIdRestart1 = parseLogFromNRuns(f, pod1.Name, pod1.Name, count1+1, deviceIDRE) + Expect(devIdRestart1).To(Equal(devId1)) + count2, devIdRestart2 := parseLogFromNRuns(f, pod2.Name, pod2.Name, count2+1, deviceIDRE) + Expect(devIdRestart2).To(Equal(devId2)) + + By("Restarting Kubelet.") + restartKubelet() + + By("Checking that scheduled pods can continue to run even after we delete device plugin and restart Kubelet.") + count1, devIdRestart1 = parseLogFromNRuns(f, pod1.Name, pod1.Name, count1+2, deviceIDRE) + Expect(devIdRestart1).To(Equal(devId1)) + count2, devIdRestart2 = parseLogFromNRuns(f, pod2.Name, pod2.Name, count2+2, deviceIDRE) + Expect(devIdRestart2).To(Equal(devId2)) + + // Cleanup + f.PodClient().DeleteSync(pod1.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + f.PodClient().DeleteSync(pod2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + }) + }) +}) + +// makeBusyboxPod returns a simple Pod spec with a busybox container // that requests resourceName and runs the specified command. func makeBusyboxPod(resourceName, cmd string) *v1.Pod { podName := "device-plugin-test-" + string(uuid.NewUUID()) @@ -78,16 +201,19 @@ func parseLogFromNRuns(f *framework.Framework, podName string, contName string, count = p.Status.ContainerStatuses[0].RestartCount return count >= restartCount }, 5*time.Minute, framework.Poll).Should(BeTrue()) + logs, err := framework.GetPodLogs(f.ClientSet, f.Namespace.Name, podName, contName) if err != nil { framework.Failf("GetPodLogs for pod %q failed: %v", podName, err) } + framework.Logf("got pod logs: %v", logs) regex := regexp.MustCompile(re) matches := regex.FindStringSubmatch(logs) if len(matches) < 2 { return count, "" } + return count, matches[1] } @@ -100,3 +226,37 @@ func numberOfDevices(node *v1.Node, resourceName string) int64 { return val.Value() } + +// stubAllocFunc will pass to stub device plugin +func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) { + var response pluginapi.AllocateResponse + for _, requestID := range r.DevicesIDs { + dev, ok := devs[requestID] + if !ok { + return nil, fmt.Errorf("invalid allocation request with non-existing device %s", requestID) + } + + if dev.Health != pluginapi.Healthy { + return nil, fmt.Errorf("invalid allocation request with unhealthy device: %s", requestID) + } + + // create fake device file + fpath := filepath.Join("/tmp", dev.ID) + + // clean first + os.RemoveAll(fpath) + f, err := os.Create(fpath) + if err != nil && !os.IsExist(err) { + return nil, fmt.Errorf("failed to create fake device file: %s", err) + } + + f.Close() + + response.Mounts = append(response.Mounts, &pluginapi.Mount{ + ContainerPath: fpath, + HostPath: fpath, + }) + } + + return &response, nil +} diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index f81ab6f5d8b..9a9e39b91eb 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -24,6 +24,7 @@ import ( "net/http" "os/exec" "reflect" + "regexp" "strings" "time"