mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
node: device-mgr: e2e: Update the e2e test to reproduce issue:109595
Breakdown of the steps implemented as part of this e2e test is as follows: 1. Create a file `registration` at path `/var/lib/kubelet/device-plugins/sample/` 2. Create sample device plugin with an environment variable with `REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration` that waits for a client to delete the control file. 3. Trigger plugin registeration by deleting the abovementioned directory. 4. Create a test pod requesting devices exposed by the device plugin. 5. Stop kubelet. 6. Remove pods using CRI to ensure new pods are created after kubelet restart. 7. Restart kubelet. 8. Wait for the sample device plugin pod to be running. In this case, the registration is not triggered. 9. Ensure that resource capacity/allocatable exported by the device plugin is zero. 10. The test pod should fail with `UnexpectedAdmissionError` 11. Delete the test pod. 12. Delete the sample device plugin pod. 13. Remove `/var/lib/kubelet/device-plugins/sample/` and its content, the directory created to control registration Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
parent
db7afc1cd8
commit
674879a959
@ -18,13 +18,13 @@ package e2enode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@ -32,6 +32,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2"
|
||||
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
@ -273,12 +274,56 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
|
||||
|
||||
})
|
||||
|
||||
ginkgo.Context("With sample device plugin", func(ctx context.Context) {
|
||||
/*
|
||||
This end to end test is to simulate a scenario where after kubelet restart/node
|
||||
reboot application pods requesting devices appear before the device plugin
|
||||
pod exposing those devices as resources.
|
||||
|
||||
The happy path is where after node reboot/ kubelet restart, the device plugin pod
|
||||
appears before the application pod. This PR and this e2e test
|
||||
aims to tackle the scenario where device plugin either does not appear first
|
||||
or doesn't get the chance to re-register itself.
|
||||
|
||||
Since there is no way of controlling the order in which the pods appear after
|
||||
kubelet restart/node reboot, we can't guarantee that the application pod
|
||||
recovers before device plugin pod (the scenario we want to exercise here).
|
||||
If the device plugin pod is recovered before the test pod, we still can
|
||||
meaningfully reproduce the scenario by NOT sending the registration command.
|
||||
To do so sample device plugin is enhanced. For implementation details, refer to:
|
||||
`test/images/sample-device-plugin/sampledeviceplugin.go`. This enhancement
|
||||
allows auto-registration of the plugin to be controlled with the help of an environment
|
||||
variable: REGISTER_CONTROL_FILE. By default this environment variable is not present
|
||||
and the device plugin autoregisters to kubelet. For this e2e test, we use sample device
|
||||
plugin spec with REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration
|
||||
to allow manual registeration of the plugin to allow an application pod (requesting devices)
|
||||
to successfully run on the node followed by kubelet restart where device plugin doesn't
|
||||
register and the application pod fails with admission error.
|
||||
|
||||
Breakdown of the steps implemented as part of this e2e test is as follows:
|
||||
1. Create a file `registration` at path `/var/lib/kubelet/device-plugins/sample/`
|
||||
2. Create sample device plugin with an environment variable with
|
||||
`REGISTER_CONTROL_FILE=/var/lib/kubelet/device-plugins/sample/registration` that
|
||||
waits for a client to delete the control file.
|
||||
3. Trigger plugin registeration by deleting the abovementioned directory.
|
||||
4. Create a test pod requesting devices exposed by the device plugin.
|
||||
5. Stop kubelet.
|
||||
6. Remove pods using CRI to ensure new pods are created after kubelet restart.
|
||||
7. Restart kubelet.
|
||||
8. Wait for the sample device plugin pod to be running. In this case,
|
||||
the registration is not triggered.
|
||||
9. Ensure that resource capacity/allocatable exported by the device plugin is zero.
|
||||
10. The test pod should fail with `UnexpectedAdmissionError`
|
||||
11. Delete the test pod.
|
||||
12. Delete the sample device plugin pod.
|
||||
13. Remove `/var/lib/kubelet/device-plugins/sample/` and its content, the directory created to control registration
|
||||
*/
|
||||
ginkgo.Context("With sample device plugin", func() {
|
||||
var deviceCount int = 2
|
||||
var devicePluginPod *v1.Pod
|
||||
var testPods []*v1.Pod
|
||||
var triggerPathFile, triggerPathDir string
|
||||
|
||||
ginkgo.BeforeEach(func() {
|
||||
// this test wants to reproduce what happened in https://github.com/kubernetes/kubernetes/issues/109595
|
||||
ginkgo.BeforeEach(func(ctx context.Context) {
|
||||
ginkgo.By("Wait for node to be ready")
|
||||
gomega.Eventually(func() bool {
|
||||
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
|
||||
@ -286,8 +331,28 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
|
||||
return nodes == 1
|
||||
}, time.Minute, time.Second).Should(gomega.BeTrue())
|
||||
|
||||
ginkgo.By("Setting up the directory and file for controlling registration")
|
||||
triggerPathDir = filepath.Join(devicePluginDir, "sample")
|
||||
if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Mkdir(triggerPathDir, os.ModePerm)
|
||||
if err != nil {
|
||||
klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err)
|
||||
panic(err)
|
||||
}
|
||||
klog.InfoS("Directory created successfully")
|
||||
|
||||
triggerPathFile = filepath.Join(triggerPathDir, "registration")
|
||||
if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) {
|
||||
_, err = os.Create(triggerPathFile)
|
||||
if err != nil {
|
||||
klog.Errorf("File creation %s failed: %v ", triggerPathFile, err)
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ginkgo.By("Scheduling a sample device plugin pod")
|
||||
data, err := e2etestfiles.Read(SampleDevicePluginDS2YAML)
|
||||
data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML)
|
||||
if err != nil {
|
||||
framework.Fail(err.Error())
|
||||
}
|
||||
@ -302,6 +367,18 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
|
||||
|
||||
devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp)
|
||||
|
||||
go func() {
|
||||
// Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE
|
||||
// environment variable is specified), device plugin registration needs to be triggerred
|
||||
// manually.
|
||||
// This is done by deleting the control file at the following path:
|
||||
// `/var/lib/kubelet/device-plugins/sample/registration`.
|
||||
|
||||
framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile)
|
||||
err := os.Remove(triggerPathFile)
|
||||
framework.ExpectNoError(err)
|
||||
}()
|
||||
|
||||
ginkgo.By("Waiting for devices to become available on the local node")
|
||||
gomega.Eventually(func() bool {
|
||||
node, ready := getLocalTestNode(ctx, f)
|
||||
@ -319,45 +396,20 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
|
||||
}, 30*time.Second, framework.Poll).Should(gomega.BeTrue())
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func() {
|
||||
ginkgo.By("Deleting the device plugin pod")
|
||||
e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute)
|
||||
|
||||
ginkgo.By("Deleting any Pods created by the test")
|
||||
l, err := e2epod.NewPodClient(f).List(context.TODO(), metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
for _, p := range l.Items {
|
||||
if p.Namespace != f.Namespace.Name {
|
||||
continue
|
||||
}
|
||||
|
||||
framework.Logf("Deleting pod: %s", p.Name)
|
||||
e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute)
|
||||
}
|
||||
|
||||
restartKubelet(true)
|
||||
|
||||
ginkgo.By("Waiting for devices to become unavailable on the local node")
|
||||
gomega.Eventually(func() bool {
|
||||
node, ready := getLocalTestNode(ctx, f)
|
||||
return ready && numberOfSampleResources(node) <= 0
|
||||
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrue())
|
||||
})
|
||||
|
||||
ginkgo.It("should recover device plugin pod first, then pod consuming devices", func() {
|
||||
ginkgo.It("should deploy pod consuming devices first but fail with admission error after kubelet restart in case device plugin hasn't re-registered", func(ctx context.Context) {
|
||||
var err error
|
||||
podCMD := "cat /tmp/Dev-* && sleep inf"
|
||||
podCMD := "while true; do sleep 1000; done;"
|
||||
|
||||
ginkgo.By(fmt.Sprintf("creating %d pods requiring %q", deviceCount, resourceName))
|
||||
var devReqPods []*v1.Pod
|
||||
for idx := 0; idx < deviceCount; idx++ {
|
||||
pod := makeBusyboxDeviceRequiringPod(resourceName, podCMD)
|
||||
devReqPods = append(devReqPods, pod)
|
||||
}
|
||||
testPods = e2epod.NewPodClient(f).CreateBatch(ctx, devReqPods)
|
||||
ginkgo.By(fmt.Sprintf("creating a pods requiring %d %q", deviceCount, resourceName))
|
||||
|
||||
pod := makeBusyboxDeviceRequiringPod(resourceName, podCMD)
|
||||
testPod := e2epod.NewPodClient(f).CreateSync(ctx, pod)
|
||||
|
||||
ginkgo.By("making sure all the pods are ready")
|
||||
waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady)
|
||||
|
||||
err = e2epod.WaitForPodCondition(ctx, f.ClientSet, testPod.Namespace, testPod.Name, "Ready", 120*time.Second, testutils.PodRunningReady)
|
||||
framework.ExpectNoError(err, "pod %s/%s did not go running", testPod.Namespace, testPod.Name)
|
||||
framework.Logf("pod %s/%s running", testPod.Namespace, testPod.Name)
|
||||
|
||||
ginkgo.By("stopping the kubelet")
|
||||
startKubelet := stopKubelet()
|
||||
@ -384,15 +436,88 @@ var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeatur
|
||||
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
|
||||
framework.ExpectNoError(err)
|
||||
return nodes == 1
|
||||
}, time.Minute, time.Second).Should(gomega.BeTrue())
|
||||
|
||||
// TODO: here we need to tolerate pods waiting to be recreated
|
||||
}, 2*time.Minute, time.Second).Should(gomega.BeTrue())
|
||||
|
||||
ginkgo.By("making sure all the pods are ready after the recovery")
|
||||
waitForPodConditionBatch(ctx, f, testPods, "Ready", 120*time.Second, testutils.PodRunningReady)
|
||||
|
||||
ginkgo.By("removing all the pods")
|
||||
deleteBatch(ctx, f, testPods)
|
||||
var devicePluginPodAfterRestart, tmpPod *v1.Pod
|
||||
|
||||
devicePluginPodAfterRestart, err = e2epod.NewPodClient(f).Get(ctx, devicePluginPod.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
err = e2epod.WaitForPodCondition(ctx, f.ClientSet, devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name, "Ready", 120*time.Second, testutils.PodRunningReady)
|
||||
framework.ExpectNoError(err, "pod %s/%s did not go running", devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name)
|
||||
framework.Logf("pod %s/%s running", devicePluginPodAfterRestart.Namespace, devicePluginPodAfterRestart.Name)
|
||||
|
||||
ginkgo.By("Waiting for the resource capacity/allocatable exported by the sample device plugin to become zero")
|
||||
|
||||
// The device plugin pod has restarted but has not re-registered to kubelet (as AUTO_REGISTER= false)
|
||||
// and registration wasn't triggered manually (by writing to the unix socket exposed at
|
||||
// `/var/lib/kubelet/device-plugins/registered`). Because of this, the capacity and allocatable corresponding
|
||||
// to the resource exposed by the device plugin should be zero.
|
||||
gomega.Eventually(func() bool {
|
||||
node, ready := getLocalTestNode(ctx, f)
|
||||
return ready &&
|
||||
numberOfDevicesCapacity(node, resourceName) == 0 &&
|
||||
numberOfDevicesAllocatable(node, resourceName) == 0
|
||||
}, 30*time.Second, framework.Poll).Should(gomega.BeTrue())
|
||||
|
||||
ginkgo.By("Checking that pod requesting devices failed to start because of admission error")
|
||||
|
||||
// NOTE: The device plugin won't re-register again and this is intentional.
|
||||
// Because of this, the testpod (requesting a device) should fail with an admission error.
|
||||
|
||||
gomega.Eventually(ctx, func() bool {
|
||||
tmpPod, err = e2epod.NewPodClient(f).Get(ctx, testPod.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
if tmpPod.Status.Phase != v1.PodFailed {
|
||||
return false
|
||||
}
|
||||
|
||||
if tmpPod.Status.Reason != "UnexpectedAdmissionError" {
|
||||
return false
|
||||
}
|
||||
|
||||
if !strings.Contains(tmpPod.Status.Message, "Allocate failed due to can't allocate unhealthy devices") {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}, time.Minute, 5*time.Second).Should(
|
||||
gomega.Equal(true),
|
||||
"the pod succeeded to start, when it should fail with the admission error",
|
||||
)
|
||||
|
||||
ginkgo.By("removing application pods")
|
||||
e2epod.NewPodClient(f).DeleteSync(ctx, tmpPod.Name, metav1.DeleteOptions{}, 2*time.Minute)
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func(ctx context.Context) {
|
||||
ginkgo.By("Deleting the device plugin pod")
|
||||
e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute)
|
||||
|
||||
ginkgo.By("Deleting the directory and file setup for controlling registration")
|
||||
err := os.RemoveAll(triggerPathDir)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Deleting any Pods created by the test")
|
||||
l, err := e2epod.NewPodClient(f).List(context.TODO(), metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
for _, p := range l.Items {
|
||||
if p.Namespace != f.Namespace.Name {
|
||||
continue
|
||||
}
|
||||
|
||||
framework.Logf("Deleting pod: %s", p.Name)
|
||||
e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute)
|
||||
}
|
||||
|
||||
ginkgo.By("Waiting for devices to become unavailable on the local node")
|
||||
gomega.Eventually(func() bool {
|
||||
node, ready := getLocalTestNode(ctx, f)
|
||||
return ready && numberOfSampleResources(node) <= 0
|
||||
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrue())
|
||||
})
|
||||
|
||||
})
|
||||
@ -483,41 +608,10 @@ func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) s
|
||||
return strings.Join(entries, ", ")
|
||||
}
|
||||
|
||||
func waitForPodConditionBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod, conditionDesc string, timeout time.Duration, condition func(pod *v1.Pod) (bool, error)) {
|
||||
var wg sync.WaitGroup
|
||||
for _, pod := range pods {
|
||||
wg.Add(1)
|
||||
go func(podNS, podName string) {
|
||||
defer ginkgo.GinkgoRecover()
|
||||
defer wg.Done()
|
||||
|
||||
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, podNS, podName, conditionDesc, timeout, condition)
|
||||
framework.ExpectNoError(err, "pod %s/%s did not go running", podNS, podName)
|
||||
framework.Logf("pod %s/%s running", podNS, podName)
|
||||
}(pod.Namespace, pod.Name)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func deleteBatch(ctx context.Context, f *framework.Framework, pods []*v1.Pod) {
|
||||
var wg sync.WaitGroup
|
||||
for _, pod := range pods {
|
||||
wg.Add(1)
|
||||
go func(podNS, podName string) {
|
||||
defer ginkgo.GinkgoRecover()
|
||||
defer wg.Done()
|
||||
|
||||
deletePodSyncByName(ctx, f, podName)
|
||||
waitForAllContainerRemoval(ctx, podName, podNS)
|
||||
}(pod.Namespace, pod.Name)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func makeBusyboxDeviceRequiringPod(resourceName, cmd string) *v1.Pod {
|
||||
podName := "device-manager-test-" + string(uuid.NewUUID())
|
||||
rl := v1.ResourceList{
|
||||
v1.ResourceName(resourceName): *resource.NewQuantity(1, resource.DecimalSI),
|
||||
v1.ResourceName(resourceName): *resource.NewQuantity(2, resource.DecimalSI),
|
||||
}
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -87,11 +87,16 @@ func updateImageAllowList(ctx context.Context) {
|
||||
} else {
|
||||
e2epod.ImagePrePullList.Insert(gpuDevicePluginImage)
|
||||
}
|
||||
if samplePluginImage, err := getSampleDevicePluginImage(); err != nil {
|
||||
if samplePluginImage, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginDSYAML); err != nil {
|
||||
klog.Errorln(err)
|
||||
} else {
|
||||
e2epod.ImagePrePullList.Insert(samplePluginImage)
|
||||
}
|
||||
if samplePluginImageCtrlReg, err := getContainerImageFromE2ETestDaemonset(SampleDevicePluginControlRegistrationDSYAML); err != nil {
|
||||
klog.Errorln(err)
|
||||
} else {
|
||||
e2epod.ImagePrePullList.Insert(samplePluginImageCtrlReg)
|
||||
}
|
||||
}
|
||||
|
||||
func getNodeProblemDetectorImage() string {
|
||||
@ -222,19 +227,19 @@ func getGPUDevicePluginImage(ctx context.Context) (string, error) {
|
||||
return ds.Spec.Template.Spec.Containers[0].Image, nil
|
||||
}
|
||||
|
||||
func getSampleDevicePluginImage() (string, error) {
|
||||
data, err := e2etestfiles.Read(SampleDevicePluginDSYAML)
|
||||
func getContainerImageFromE2ETestDaemonset(dsYamlPath string) (string, error) {
|
||||
data, err := e2etestfiles.Read(dsYamlPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read the sample plugin yaml: %w", err)
|
||||
return "", fmt.Errorf("failed to read the daemonset yaml: %w", err)
|
||||
}
|
||||
|
||||
ds, err := e2emanifest.DaemonSetFromData(data)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to parse daemon set for sample plugin: %w", err)
|
||||
return "", fmt.Errorf("failed to parse daemonset yaml: %w", err)
|
||||
}
|
||||
|
||||
if len(ds.Spec.Template.Spec.Containers) < 1 {
|
||||
return "", fmt.Errorf("failed to parse the sample plugin image: cannot extract the container from YAML")
|
||||
return "", fmt.Errorf("failed to parse the container image: cannot extract the container from YAML")
|
||||
}
|
||||
return ds.Spec.Template.Spec.Containers[0].Image, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user