node: device-plugin: add node reboot test scenario

Add a test suit to simulate node reboot (achieved by removing pods
using CRI API before kubelet is restarted).

Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
Swati Sehgal 2023-04-26 17:41:14 +01:00
parent a26f4d855d
commit 3dbb741c97

View File

@ -18,7 +18,9 @@ package e2enode
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
@ -32,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
admissionapi "k8s.io/pod-security-admission/api"
@ -43,6 +47,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
)
var (
@ -55,6 +60,7 @@ var _ = SIGDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:Devic
f := framework.NewDefaultFramework("device-plugin-errors")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
testDevicePlugin(f, kubeletdevicepluginv1beta1.DevicePluginPath)
testDevicePluginNodeReboot(f, kubeletdevicepluginv1beta1.DevicePluginPath)
})
// readDaemonSetV1OrDie reads daemonset object from bytes. Panics on error.
@ -414,6 +420,202 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) {
})
}
func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) {
ginkgo.Context("DevicePlugin [Serial] [Disruptive]", func() {
var devicePluginPod *v1.Pod
var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse
var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse
var triggerPathFile, triggerPathDir string
var err error
ginkgo.BeforeEach(func(ctx context.Context) {
ginkgo.By("Wait for node to be ready")
gomega.Eventually(ctx, func(ctx context.Context) bool {
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
// Before we run the device plugin test, we need to ensure
// that the cluster is in a clean state and there are no
// pods running on this node.
// This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress.
// xref: https://issue.k8s.io/115381
gomega.Eventually(ctx, func(ctx context.Context) error {
v1alphaPodResources, err = getV1alpha1NodeDevices(ctx)
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err)
}
v1PodResources, err = getV1NodeDevices(ctx)
if err != nil {
return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err)
}
if len(v1alphaPodResources.PodResources) > 0 {
return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources)
}
if len(v1PodResources.PodResources) > 0 {
return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources)
}
return nil
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed())
ginkgo.By("Setting up the directory and file for controlling registration")
triggerPathDir = filepath.Join(devicePluginDir, "sample")
if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) {
err := os.Mkdir(triggerPathDir, os.ModePerm)
if err != nil {
klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err)
panic(err)
}
klog.InfoS("Directory created successfully")
triggerPathFile = filepath.Join(triggerPathDir, "registration")
if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) {
_, err = os.Create(triggerPathFile)
if err != nil {
klog.Errorf("File creation %s failed: %v ", triggerPathFile, err)
panic(err)
}
}
}
ginkgo.By("Scheduling a sample device plugin pod")
data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML)
if err != nil {
framework.Fail(err.Error())
}
ds := readDaemonSetV1OrDie(data)
dp := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: SampleDevicePluginName,
},
Spec: ds.Spec.Template.Spec,
}
devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp)
go func() {
// Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE
// environment variable is specified), device plugin registration needs to be triggerred
// manually.
// This is done by deleting the control file at the following path:
// `/var/lib/kubelet/device-plugins/sample/registration`.
defer ginkgo.GinkgoRecover()
framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile)
err := os.Remove(triggerPathFile)
framework.ExpectNoError(err)
}()
ginkgo.By("Waiting for devices to become available on the local node")
gomega.Eventually(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && CountSampleDeviceCapacity(node) > 0
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrue())
framework.Logf("Successfully created device plugin pod")
ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", expectedSampleDevsAmount))
gomega.Eventually(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready &&
CountSampleDeviceCapacity(node) == expectedSampleDevsAmount &&
CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount
}, 30*time.Second, framework.Poll).Should(gomega.BeTrue())
})
ginkgo.AfterEach(func(ctx context.Context) {
ginkgo.By("Deleting the device plugin pod")
e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute)
ginkgo.By("Deleting any Pods created by the test")
l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
for _, p := range l.Items {
if p.Namespace != f.Namespace.Name {
continue
}
framework.Logf("Deleting pod: %s", p.Name)
e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute)
}
err = os.Remove(triggerPathDir)
framework.ExpectNoError(err)
ginkgo.By("Waiting for devices to become unavailable on the local node")
gomega.Eventually(ctx, func(ctx context.Context) bool {
node, ready := getLocalTestNode(ctx, f)
return ready && CountSampleDeviceCapacity(node) <= 0
}, 5*time.Minute, framework.Poll).Should(gomega.BeTrue())
ginkgo.By("devices now unavailable on the local node")
})
// simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that,
// intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod
// exposing those devices as resource has re-registers itself to Kubelet. The expected behavior is that the application pod fails at
// admission time.
ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func(ctx context.Context) {
podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever)
pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD))
deviceIDRE := "stub devices: (Dev-[0-9]+)"
devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE)
framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name)
gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")))
pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
ginkgo.By("stopping the kubelet")
startKubelet := stopKubelet()
ginkgo.By("stopping all the local containers - using CRI")
rs, _, err := getCRIClient()
framework.ExpectNoError(err)
sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{})
framework.ExpectNoError(err)
for _, sandbox := range sandboxes {
gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil())
ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id))
err := rs.RemovePodSandbox(ctx, sandbox.Id)
framework.ExpectNoError(err)
}
ginkgo.By("restarting the kubelet")
startKubelet()
ginkgo.By("Wait for node to be ready again")
e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute)
ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet")
gomega.Eventually(ctx, getPod).
WithArguments(f, pod1.Name).
WithTimeout(time.Minute).
Should(HaveFailedWithAdmissionError(),
"the pod succeeded to start, when it should fail with the admission error")
// crosscheck from the device assignment is preserved and stable from perspective of the kubelet.
// note we don't check again the logs of the container: the check is done at startup, the container
// never restarted (runs "forever" from this test timescale perspective) hence re-doing this check
// is useless.
ginkgo.By("Verifying the device assignment after kubelet restart using podresources API")
gomega.Eventually(ctx, func() error {
v1PodResources, err = getV1NodeDevices(ctx)
return err
}, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart")
err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1})
framework.ExpectNoError(err, "inconsistent device assignment after node reboot")
})
})
}
// makeBusyboxPod returns a simple Pod spec with a busybox container
// that requests SampleDeviceResourceName and runs the specified command.
func makeBusyboxPod(SampleDeviceResourceName, cmd string) *v1.Pod {