From 3bcf4220ece998d626ae670f911f8a1a1bb31507 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Fri, 9 Jun 2023 16:49:58 +0200 Subject: [PATCH 1/5] kubelet: devices: skip allocation for running pods When kubelet initializes, runs admission for pods and possibly allocated requested resources. We need to distinguish between node reboot (no containers running) versus kubelet restart (containers potentially running). Running pods should always survive kubelet restart. This means that device allocation on admission should not be attempted, because if a container requires devices and is still running when kubelet is restarting, that container already has devices allocated and working. Thus, we need to properly detect this scenario in the allocation step and handle it explicitely. We need to inform the devicemanager about which pods are already running. Note that if container runtime is down when kubelet restarts, the approach implemented here won't work. In this scenario, so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again. This scenario should however be pretty rare. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/container_manager_linux.go | 28 ++------- pkg/kubelet/cm/container_manager_windows.go | 6 +- pkg/kubelet/cm/devicemanager/manager.go | 60 +++++++++++++++++++- pkg/kubelet/cm/devicemanager/manager_test.go | 7 ++- pkg/kubelet/cm/devicemanager/types.go | 4 +- pkg/kubelet/cm/helpers.go | 32 +++++++++++ 6 files changed, 108 insertions(+), 29 deletions(-) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 02cb34ddcdc..78a31b19c63 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -51,7 +51,6 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" - "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/dra" @@ -563,8 +562,9 @@ func (cm *containerManagerImpl) Start(node *v1.Node, localStorageCapacityIsolation bool) error { ctx := context.Background() + containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Initialize CPU manager - containerMap := buildContainerMapFromRuntime(ctx, runtimeService) err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) if err != nil { return fmt.Errorf("start cpu manager error: %v", err) @@ -572,7 +572,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, // Initialize memory manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { - containerMap := buildContainerMapFromRuntime(ctx, runtimeService) + containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) if err != nil { return fmt.Errorf("start memory manager error: %v", err) @@ -636,7 +636,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } // Starts device manager. - if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { + if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil { return err } @@ -699,26 +699,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { } } -func buildContainerMapFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) containermap.ContainerMap { - podSandboxMap := make(map[string]string) - podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil) - for _, p := range podSandboxList { - podSandboxMap[p.Id] = p.Metadata.Uid - } - - containerMap := containermap.NewContainerMap() - containerList, _ := runtimeService.ListContainers(ctx, nil) - for _, c := range containerList { - if _, exists := podSandboxMap[c.PodSandboxId]; !exists { - klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id) - continue - } - containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id) - } - - return containerMap -} - func isProcessRunningInHost(pid int) (bool, error) { // Get init pid namespace. initPidNs, err := os.Readlink("/proc/1/ns/pid") diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index c26a0683769..f62944aafcd 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -23,6 +23,7 @@ limitations under the License. package cm import ( + "context" "fmt" "k8s.io/klog/v2" @@ -86,8 +87,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } } + ctx := context.Background() + containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Starts device manager. - if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { + if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil { return err } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 7499de4460f..fed9ba4bb89 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -36,6 +36,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -97,6 +98,15 @@ type ManagerImpl struct { // pendingAdmissionPod contain the pod during the admission phase pendingAdmissionPod *v1.Pod + + // containerMap provides a mapping from (pod, container) -> containerID + // for all containers in a pod. Used to detect pods running across a restart + containerMap containermap.ContainerMap + + // containerRunningSet identifies which container among those present in `containerMap` + // was reported running by the container runtime when `containerMap` was computed. + // Used to detect pods running across a restart + containerRunningSet sets.String } type endpointInfo struct { @@ -277,11 +287,13 @@ func (m *ManagerImpl) checkpointFile() string { // Start starts the Device Plugin Manager and start initialization of // podDevices and allocatedDevices information from checkpointed state and // starts device plugin registration service. -func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { +func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error { klog.V(2).InfoS("Starting Device Plugin manager") m.activePods = activePods m.sourcesReady = sourcesReady + m.containerMap = initialContainers + m.containerRunningSet = initialContainerRunningSet // Loads in allocatedDevices information from disk. err := m.readCheckpoint() @@ -545,10 +557,31 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi } } + // We have 3 major flows to handle: + // 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large. + // 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running. + // 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices. + // note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies. + + // First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready. + // Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported + // running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be + // because it already has access to all the required devices, so we got nothing to do and we can bail out. + if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) { + klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) + return nil, nil + } + + // We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow). klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) healthyDevices, hasRegistered := m.healthyDevices[resource] - // Check if resource registered with devicemanager + // The following checks are expected to fail only happen on scenario 3 (node reboot). + // The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created, + // has registered and reported back to kubelet the devices. + // This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices. + // Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595 + // Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks. if !hasRegistered { return nil, fmt.Errorf("cannot allocate unregistered device %s", resource) } @@ -563,7 +596,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource) } + // We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path. + // We cover container restart on kubelet steady state with the same flow. if needed == 0 { + klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) // No change, no work. return nil, nil } @@ -1040,3 +1076,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) { m.pendingAdmissionPod = pod } + +func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool { + cntID, err := m.containerMap.GetContainerID(podUID, cntName) + if err != nil { + klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err) + return false + } + + // note that if container runtime is down when kubelet restarts, this set will be empty, + // so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again. + // This scenario should however be rare enough. + if !m.containerRunningSet.Has(cntID) { + klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID) + return false + } + + // Once we make it here we know we have a running container. + klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID) + return true +} diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 1ac9cb55674..6601a1e7b85 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -41,6 +41,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -284,7 +285,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor return []*v1.Pod{} } - err = w.Start(activePods, &sourcesReadyStub{}) + // test steady state, initialization where sourcesReady, containerMap and containerRunningSet + // are relevant will be tested with a different flow + err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString()) require.NoError(t, err) return w, updateChan @@ -1012,6 +1015,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) { unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: newPodDevices(), + activePods: func() []*v1.Pod { return []*v1.Pod{} }, + sourcesReady: &sourcesReadyStub{}, } testManager.podDevices.insert("pod1", "con1", resourceName1, diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index d508e8c9969..eb1c64548eb 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -20,6 +20,8 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -31,7 +33,7 @@ import ( // Manager manages all the Device Plugins running on a node. type Manager interface { // Start starts device plugin registration service. - Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error + Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error // Allocate configures and assigns devices to a container in a pod. From // the requested device resources, Allocate will communicate with the diff --git a/pkg/kubelet/cm/helpers.go b/pkg/kubelet/cm/helpers.go index dbbea4a8040..6be3e272307 100644 --- a/pkg/kubelet/cm/helpers.go +++ b/pkg/kubelet/cm/helpers.go @@ -17,7 +17,14 @@ limitations under the License. package cm import ( + "context" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + internalapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" ) @@ -44,3 +51,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res } return ret } + +func buildContainerMapAndRunningSetFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) { + podSandboxMap := make(map[string]string) + podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil) + for _, p := range podSandboxList { + podSandboxMap[p.Id] = p.Metadata.Uid + } + + runningSet := sets.NewString() + containerMap := containermap.NewContainerMap() + containerList, _ := runtimeService.ListContainers(ctx, nil) + for _, c := range containerList { + if _, exists := podSandboxMap[c.PodSandboxId]; !exists { + klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id) + continue + } + podUID := podSandboxMap[c.PodSandboxId] + containerMap.Add(podUID, c.Metadata.Name, c.Id) + if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING { + klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id) + runningSet.Insert(c.Id) + } + } + return containerMap, runningSet +} From b926aba2689f5f89de9a13e3a647aab7ee0aa108 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 15 Jun 2023 17:41:42 +0200 Subject: [PATCH 2/5] e2e: node: devicemanager: update tests Fix e2e device manager tests. Most notably, the workload pods needs to survive a kubelet restart. Update tests to reflect that. Signed-off-by: Francesco Romani --- test/e2e_node/device_manager_test.go | 9 +- test/e2e_node/device_plugin_test.go | 157 +++++++++++++++------------ 2 files changed, 93 insertions(+), 73 deletions(-) diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go index 95434849d34..d38b3981154 100644 --- a/test/e2e_node/device_manager_test.go +++ b/test/e2e_node/device_manager_test.go @@ -721,11 +721,14 @@ func hasFailed(hasFailed bool) types.GomegaMatcher { }).WithTemplate("expected Pod failed {{.To}} be in {{format .Data}}\nGot instead:\n{{.FormattedActual}}").WithTemplateData(hasFailed) } -func getPod(ctx context.Context, f *framework.Framework, podName string) (bool, error) { +func getPodByName(ctx context.Context, f *framework.Framework, podName string) (*v1.Pod, error) { + return e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{}) +} - pod, err := e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{}) +func getPod(ctx context.Context, f *framework.Framework, podName string) (bool, error) { + pod, err := getPodByName(ctx, f, podName) if err != nil { - return false, fmt.Errorf("expected node to get pod=%q got err=%q", pod.Name, err) + return false, err } expectedStatusReason := "UnexpectedAdmissionError" diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index 5b3d9c750ed..aa653a583e6 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -28,11 +28,14 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" + "github.com/onsi/gomega/types" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" @@ -42,6 +45,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubectl/pkg/util/podutils" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" "k8s.io/kubernetes/test/e2e/framework" @@ -239,7 +243,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { // simulate container restart, while all other involved components (kubelet, device plugin) stay stable. To do so, in the container // entry point we sleep for a limited and short period of time. The device assignment should be kept and be stable across the container // restarts. For the sake of brevity we however check just the fist restart. - ginkgo.It("Keeps device plugin assignments across pod restarts (no kubelet restart, device plugin re-registration)", func(ctx context.Context) { + ginkgo.It("Keeps device plugin assignments across pod restarts (no kubelet restart, no device plugin restart)", func(ctx context.Context) { podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) deviceIDRE := "stub devices: (Dev-[0-9]+)" @@ -291,10 +295,11 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) - // simulate kubelet restart, *but not* device plugin re-registration, while the pod and the container stays running. + // simulate kubelet restart. A compliant device plugin is expected to re-register, while the pod and the container stays running. + // The flow with buggy or slow device plugin is deferred to another test. // The device assignment should be kept and be stable across the kubelet restart, because it's the kubelet which performs the device allocation, // and both the device plugin and the actual consumer (container) are stable. - ginkgo.It("Keeps device plugin assignments across kubelet restarts (no pod restart, no device plugin re-registration)", func(ctx context.Context) { + ginkgo.It("Keeps device plugin assignments across kubelet restarts (no pod restart, no device plugin restart)", func(ctx context.Context) { podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) deviceIDRE := "stub devices: (Dev-[0-9]+)" @@ -304,6 +309,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) + framework.Logf("testing pod: UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) ginkgo.By("Restarting Kubelet") restartKubelet(true) @@ -319,12 +325,16 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) - ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") - gomega.Eventually(ctx, getPod). + ginkgo.By("Checking the same instance of the pod is still running") + gomega.Eventually(ctx, getPodByName). WithArguments(f, pod1.Name). WithTimeout(time.Minute). - Should(HaveFailedWithAdmissionError(), - "the pod succeeded to start, when it should fail with the admission error") + Should(BeTheSamePodStillRunning(pod1), + "the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts") + + pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + framework.Logf("testing pod: UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. // note we don't check again the logs of the container: the check is done at startup, the container @@ -340,10 +350,10 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) - // simulate kubelet and container restart, *but not* device plugin re-registration. + // simulate kubelet and container restart, *but not* device plugin restart. // The device assignment should be kept and be stable across the kubelet and container restart, because it's the kubelet which // performs the device allocation, and both the device plugin is stable. - ginkgo.It("Keeps device plugin assignments across pod and kubelet restarts (no device plugin re-registration)", func(ctx context.Context) { + ginkgo.It("Keeps device plugin assignments across pod and kubelet restarts (no device plugin restart)", func(ctx context.Context) { podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) deviceIDRE := "stub devices: (Dev-[0-9]+)" @@ -372,18 +382,28 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { ginkgo.By("Wait for node to be ready again") e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) - ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") - gomega.Eventually(ctx, getPod). + ginkgo.By("Checking an instance of the pod is running") + gomega.Eventually(ctx, getPodByName). WithArguments(f, pod1.Name). WithTimeout(time.Minute). - Should(HaveFailedWithAdmissionError(), - "the pod succeeded to start, when it should fail with the admission error") + Should(gomega.And( + BeAPodInPhase(v1.PodRunning), + BeAPodReady(), + ), + "the pod should still be running, the workload should not be perturbed by kubelet restarts") - // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. - // note we don't check again the logs of the container: the check is done at startup, the container - // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check - // is useless. - ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") + ginkgo.By("Verifying the device assignment after pod and kubelet restart using container logs") + var devID1Restarted string + gomega.Eventually(ctx, func() string { + devID1Restarted, err = parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) + if err != nil { + framework.Logf("error getting logds for pod %q: %v", pod1.Name, err) + return "" + } + return devID1Restarted + }, 30*time.Second, framework.Poll).Should(gomega.Equal(devID1), "pod %s reports a different device after restarts: %s (expected %s)", pod1.Name, devID1Restarted, devID1) + + ginkgo.By("Verifying the device assignment after pod and kubelet restart using podresources API") gomega.Eventually(ctx, func() error { v1PodResources, err = getV1NodeDevices(ctx) return err @@ -397,8 +417,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { // After the device plugin has re-registered, the list healthy devices is repopulated based on the devices discovered. // Once Pod2 is running we determine the device that was allocated it. As long as the device allocation succeeds the // test should pass. - - ginkgo.It("Keeps device plugin assignments after the device plugin has been re-registered (no kubelet, pod restart)", func(ctx context.Context) { + ginkgo.It("Keeps device plugin assignments after the device plugin has restarted (no kubelet restart, pod restart)", func(ctx context.Context) { podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) deviceIDRE := "stub devices: (Dev-[0-9]+)" @@ -433,12 +452,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) - // crosscheck that after device plugin re-registration the device assignment is preserved and + // crosscheck that after device plugin restart the device assignment is preserved and // stable from the kubelet's perspective. // note we don't check again the logs of the container: the check is done at startup, the container // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check // is useless. - ginkgo.By("Verifying the device assignment after device plugin re-registration using podresources API") + ginkgo.By("Verifying the device assignment after device plugin restart using podresources API") gomega.Eventually(ctx, func() error { v1PodResources, err = getV1NodeDevices(ctx) return err @@ -446,30 +465,19 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") - - ginkgo.By("Creating another pod") - pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) - err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, pod2.Name, f.Namespace.Name, 1*time.Minute) - framework.ExpectNoError(err) - - ginkgo.By("Checking that pod got a fake device") - devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE) - framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name) - - gomega.Expect(devID2).To(gomega.Not(gomega.Equal("")), "pod2 requested a device but started successfully without") }) - // simulate kubelet restart *and* device plugin re-registration, while the pod and the container stays running. + // simulate kubelet restart *and* device plugin restart, while the pod and the container stays running. // The device assignment should be kept and be stable across the kubelet/device plugin restart, as both the aforementioned components // orchestrate the device allocation: the actual consumer (container) is stable. - ginkgo.It("Keeps device plugin assignments after kubelet restart and device plugin has been re-registered (no pod restart)", func(ctx context.Context) { + ginkgo.It("Keeps device plugin assignments after kubelet restart and device plugin restart (no pod restart)", func(ctx context.Context) { podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) // the pod has to run "forever" in the timescale of this test pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) deviceIDRE := "stub devices: (Dev-[0-9]+)" devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) - gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") + gomega.Expect(devID1).To(gomega.Not(gomega.BeEmpty()), "pod1 requested a device but started successfully without") pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) @@ -480,12 +488,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { ginkgo.By("Wait for node to be ready again") e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) - ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") - gomega.Eventually(ctx, getPod). + ginkgo.By("Checking the same instance of the pod is still running after kubelet restart") + gomega.Eventually(ctx, getPodByName). WithArguments(f, pod1.Name). WithTimeout(time.Minute). - Should(HaveFailedWithAdmissionError(), - "the pod succeeded to start, when it should fail with the admission error") + Should(BeTheSamePodStillRunning(pod1), + "the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts") // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. // note we don't check again the logs of the container: the check is done at startup, the container @@ -511,7 +519,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { ginkgo.By("Recreating the plugin pod") devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) - ginkgo.By("Waiting for resource to become available on the local node after re-registration") + ginkgo.By("Waiting for resource to become available on the local node after restart") gomega.Eventually(ctx, func() bool { node, ready := getLocalTestNode(ctx, f) return ready && @@ -519,22 +527,12 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) - ginkgo.By("Creating another pod") - pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) - - ginkgo.By("Checking that pod got a fake device") - devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE) - framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name) - - ginkgo.By("Verifying the device assignment after kubelet restart and device plugin re-registration using podresources API") - // note we don't use eventually: the kubelet is supposed to be running and stable by now, so the call should just succeed - v1PodResources, err = getV1NodeDevices(ctx) - if err != nil { - framework.ExpectNoError(err, "getting pod resources assignment after pod restart") - } - - err = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) - framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") + ginkgo.By("Checking the same instance of the pod is still running after the device plugin restart") + gomega.Eventually(ctx, getPodByName). + WithArguments(f, pod1.Name). + WithTimeout(time.Minute). + Should(BeTheSamePodStillRunning(pod1), + "the same pod instance not running across kubelet restarts, workload should not be perturbed by device plugins restarts") }) }) } @@ -542,10 +540,10 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { ginkgo.Context("DevicePlugin [Serial] [Disruptive]", func() { var devicePluginPod *v1.Pod - var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse var triggerPathFile, triggerPathDir string var err error + ginkgo.BeforeEach(func(ctx context.Context) { ginkgo.By("Wait for node to be ready") gomega.Eventually(ctx, func(ctx context.Context) bool { @@ -560,25 +558,16 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. // xref: https://issue.k8s.io/115381 gomega.Eventually(ctx, func(ctx context.Context) error { - v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) - if err != nil { - return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err) - } - v1PodResources, err = getV1NodeDevices(ctx) if err != nil { return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err) } - if len(v1alphaPodResources.PodResources) > 0 { - return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources) - } - if len(v1PodResources.PodResources) > 0 { return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) } return nil - }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) + }, f.Timeouts.SystemDaemonsetStartup, f.Timeouts.Poll).Should(gomega.Succeed()) ginkgo.By("Setting up the directory and file for controlling registration") triggerPathDir = filepath.Join(devicePluginDir, "sample") @@ -675,8 +664,7 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { // simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that, // intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod - // exposing those devices as resource has re-registers itself to Kubelet. The expected behavior is that the application pod fails at - // admission time. + // exposing those devices as resource has restarted. The expected behavior is that the application pod fails at admission time. ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func(ctx context.Context) { podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) @@ -851,3 +839,32 @@ func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod { return dp } + +func BeTheSamePodStillRunning(expected *v1.Pod) types.GomegaMatcher { + return gomega.And( + BeTheSamePodAs(expected.UID), + BeAPodInPhase(v1.PodRunning), + BeAPodReady(), + ) +} + +// BeReady matches if the pod is reported ready +func BeAPodReady() types.GomegaMatcher { + return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) { + return podutils.IsPodReady(actual), nil + }).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} UID {{.Actual.UID}} not ready yet") +} + +// BeAPodInPhase matches if the pod is running +func BeAPodInPhase(phase v1.PodPhase) types.GomegaMatcher { + return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) { + return actual.Status.Phase == phase, nil + }).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} failed {{.To}} be in phase {{.Data}} instead is in phase {{.Actual.Status.Phase}}").WithTemplateData(phase) +} + +// BeTheSamePodAs matches if the pod has the given UID +func BeTheSamePodAs(podUID k8stypes.UID) types.GomegaMatcher { + return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) { + return actual.UID == podUID, nil + }).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} expected UID {{.Data}} has UID instead {{.Actual.UID}}").WithTemplateData(podUID) +} From 5cf50105a2b58ae5660d68df729b8a609fa01536 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Wed, 21 Jun 2023 19:20:05 +0200 Subject: [PATCH 3/5] e2e: node: devices: improve the node reboot test The recently added e2e device plugins test to cover node reboot works fine if runs every time on CI environment (e.g CI) but doesn't handle correctly partial setup when run repeatedly on the same instance (developer setup). To accomodate both flows, we extend the error management, checking more error conditions in the flow. Signed-off-by: Francesco Romani --- test/e2e_node/device_plugin_test.go | 43 +++++++++++++++++------------ 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index aa653a583e6..d750a70d6fa 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -38,7 +38,6 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/klog/v2" kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" admissionapi "k8s.io/pod-security-admission/api" @@ -569,30 +568,40 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { return nil }, f.Timeouts.SystemDaemonsetStartup, f.Timeouts.Poll).Should(gomega.Succeed()) - ginkgo.By("Setting up the directory and file for controlling registration") + ginkgo.By("Setting up the directory for controlling registration") triggerPathDir = filepath.Join(devicePluginDir, "sample") - if _, err := os.Stat(triggerPathDir); errors.Is(err, os.ErrNotExist) { - err := os.Mkdir(triggerPathDir, os.ModePerm) - if err != nil { - klog.Errorf("Directory creation %s failed: %v ", triggerPathDir, err) - panic(err) - } - klog.InfoS("Directory created successfully") - - triggerPathFile = filepath.Join(triggerPathDir, "registration") - if _, err := os.Stat(triggerPathFile); errors.Is(err, os.ErrNotExist) { - _, err = os.Create(triggerPathFile) - if err != nil { - klog.Errorf("File creation %s failed: %v ", triggerPathFile, err) - panic(err) + if _, err := os.Stat(triggerPathDir); err != nil { + if errors.Is(err, os.ErrNotExist) { + if err := os.Mkdir(triggerPathDir, os.ModePerm); err != nil { + framework.Fail(fmt.Sprintf("registration control directory %q creation failed: %v ", triggerPathDir, err)) } + framework.Logf("registration control directory created successfully") + } else { + framework.Fail(fmt.Sprintf("unexpected error checking %q: %v", triggerPathDir, err)) } + } else { + framework.Logf("registration control directory %q already present", triggerPathDir) + } + + ginkgo.By("Setting up the file trigger for controlling registration") + triggerPathFile = filepath.Join(triggerPathDir, "registration") + if _, err := os.Stat(triggerPathFile); err != nil { + if errors.Is(err, os.ErrNotExist) { + if _, err = os.Create(triggerPathFile); err != nil { + framework.Fail(fmt.Sprintf("registration control file %q creation failed: %v", triggerPathFile, err)) + } + framework.Logf("registration control file created successfully") + } else { + framework.Fail(fmt.Sprintf("unexpected error creating %q: %v", triggerPathFile, err)) + } + } else { + framework.Logf("registration control file %q already present", triggerPathFile) } ginkgo.By("Scheduling a sample device plugin pod") data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML) if err != nil { - framework.Fail(err.Error()) + framework.Fail(fmt.Sprintf("error reading test data %q: %v", SampleDevicePluginControlRegistrationDSYAML, err)) } ds := readDaemonSetV1OrDie(data) From d78671447f22203e13b83eb03dabe728718fdaaf Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Wed, 21 Jun 2023 19:20:58 +0200 Subject: [PATCH 4/5] e2e: node: add test to check device-requiring pods are cleaned up Make sure orphanded pods (pods deleted while kubelet is down) are handled correctly. Outline: 1. create a pod (not static pod) 2. stop kubelet 3. while kubelet is down, force delete the pod on API server 4. restart kubelet the pod becomes an orphaned pod and is expected to be killed by HandlePodCleanups. There is a similar test already, but here we want to check device assignment. Signed-off-by: Francesco Romani --- test/e2e_node/device_plugin_test.go | 90 ++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 16 deletions(-) diff --git a/test/e2e_node/device_plugin_test.go b/test/e2e_node/device_plugin_test.go index d750a70d6fa..4cbf558505f 100644 --- a/test/e2e_node/device_plugin_test.go +++ b/test/e2e_node/device_plugin_test.go @@ -269,7 +269,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { if err != nil { framework.ExpectNoError(err, "getting pod resources assignment after pod restart") } - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") ginkgo.By("Creating another pod") @@ -288,9 +288,9 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { if err != nil { framework.ExpectNoError(err, "getting pod resources assignment after pod restart") } - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod1") - err = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") }) @@ -308,7 +308,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) - framework.Logf("testing pod: UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) + framework.Logf("testing pod: pre-restart UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) ginkgo.By("Restarting Kubelet") restartKubelet(true) @@ -331,9 +331,9 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { Should(BeTheSamePodStillRunning(pod1), "the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts") - pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) + pod2, err := e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) - framework.Logf("testing pod: UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) + framework.Logf("testing pod: post-restart UID=%s namespace=%s name=%s ready=%v", pod2.UID, pod2.Namespace, pod2.Name, podutils.IsPodReady(pod2)) // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. // note we don't check again the logs of the container: the check is done at startup, the container @@ -345,7 +345,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) @@ -408,7 +408,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) @@ -462,7 +462,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") }) @@ -504,7 +504,7 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after pod restart") ginkgo.By("Re-Register resources by deleting the plugin pod") @@ -533,6 +533,62 @@ func testDevicePlugin(f *framework.Framework, pluginSockDir string) { Should(BeTheSamePodStillRunning(pod1), "the same pod instance not running across kubelet restarts, workload should not be perturbed by device plugins restarts") }) + + ginkgo.It("[OrphanedPods] Ensures pods consuming devices deleted while kubelet is down are cleaned up correctly", func(ctx context.Context) { + podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) + pod := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) + + deviceIDRE := "stub devices: (Dev-[0-9]+)" + devID, err := parseLog(ctx, f, pod.Name, pod.Name, deviceIDRE) + framework.ExpectNoError(err, "getting logs for pod %q", pod.Name) + gomega.Expect(devID).To(gomega.Not(gomega.BeEmpty()), "pod1 requested a device but started successfully without") + + pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("stopping the kubelet") + startKubelet := stopKubelet() + + // wait until the kubelet health check will fail + gomega.Eventually(ctx, func() bool { + ok := kubeletHealthCheck(kubeletHealthCheckURL) + framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok) + return ok + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse()) + + framework.Logf("Delete the pod while the kubelet is not running") + // Delete pod sync by name will force delete the pod, removing it from kubelet's config + deletePodSyncByName(ctx, f, pod.Name) + + framework.Logf("Starting the kubelet") + startKubelet() + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func() bool { + ok := kubeletHealthCheck(kubeletHealthCheckURL) + framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok) + return ok + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue()) + + framework.Logf("wait for the pod %v to disappear", pod.Name) + gomega.Eventually(ctx, func(ctx context.Context) error { + err := checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace) + framework.Logf("pod %s/%s disappear check err=%v", pod.Namespace, pod.Name, err) + return err + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil()) + + waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace) + + ginkgo.By("Verifying the device assignment after device plugin restart using podresources API") + gomega.Eventually(ctx, func() error { + v1PodResources, err = getV1NodeDevices(ctx) + return err + }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") + err, allocated := checkPodResourcesAssignment(v1PodResources, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, SampleDeviceResourceName, []string{}) + if err == nil || allocated { + framework.Fail(fmt.Sprintf("stale device assignment after pod deletion while kubelet was down allocated=%v error=%v", allocated, err)) + } + }) }) } @@ -725,7 +781,7 @@ func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { return err }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") - err = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) + err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) framework.ExpectNoError(err, "inconsistent device assignment after node reboot") }) @@ -793,7 +849,7 @@ func parseLog(ctx context.Context, f *framework.Framework, podName string, contN return matches[1], nil } -func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResourcesResponse, podNamespace, podName, containerName, resourceName string, devs []string) error { +func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResourcesResponse, podNamespace, podName, containerName, resourceName string, devs []string) (error, bool) { for _, podRes := range v1PodRes.PodResources { if podRes.Namespace != podNamespace || podRes.Name != podName { continue @@ -805,10 +861,12 @@ func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResource return matchContainerDevices(podNamespace+"/"+podName+"/"+containerName, contRes.Devices, resourceName, devs) } } - return fmt.Errorf("no resources found for %s/%s/%s", podNamespace, podName, containerName) + err := fmt.Errorf("no resources found for %s/%s/%s", podNamespace, podName, containerName) + framework.Logf("%v", err) + return err, false } -func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.ContainerDevices, resourceName string, devs []string) error { +func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.ContainerDevices, resourceName string, devs []string) (error, bool) { expected := sets.New[string](devs...) assigned := sets.New[string]() for _, contDev := range contDevs { @@ -821,9 +879,9 @@ func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.Conta assignedStr := strings.Join(assigned.UnsortedList(), ",") framework.Logf("%s: devices expected %q assigned %q", ident, expectedStr, assignedStr) if !assigned.Equal(expected) { - return fmt.Errorf("device allocation mismatch for %s expected %s assigned %s", ident, expectedStr, assignedStr) + return fmt.Errorf("device allocation mismatch for %s expected %s assigned %s", ident, expectedStr, assignedStr), true } - return nil + return nil, true } // getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests. From c635a7e7d8362ac7c706680e77f7680895b1d517 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 11 Jul 2023 18:54:51 +0200 Subject: [PATCH 5/5] node: devicemgr: topomgr: add logs One of the contributing factors of issues #118559 and #109595 hard to debug and fix is that the devicemanager has very few logs in important flow, so it's unnecessarily hard to reconstruct the state from logs. We add minimal logs to be able to improve troubleshooting. We add minimal logs to be backport-friendly, deferring a more comprehensive review of logging to later PRs. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/devicemanager/manager.go | 4 ++++ pkg/kubelet/cm/topologymanager/topology_manager.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index fed9ba4bb89..d780ee801bd 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -226,6 +226,7 @@ func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin defer m.mutex.Unlock() m.endpoints[resourceName] = endpointInfo{e, options} + klog.V(2).InfoS("Device plugin connected", "resourceName", resourceName) return nil } @@ -256,6 +257,7 @@ func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *plug } func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) { + healthyCount := 0 m.mutex.Lock() m.healthyDevices[resourceName] = sets.NewString() m.unhealthyDevices[resourceName] = sets.NewString() @@ -264,6 +266,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [ m.allDevices[resourceName][dev.ID] = dev if dev.Health == pluginapi.Healthy { m.healthyDevices[resourceName].Insert(dev.ID) + healthyCount++ } else { m.unhealthyDevices[resourceName].Insert(dev.ID) } @@ -272,6 +275,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [ if err := m.writeCheckpoint(); err != nil { klog.ErrorS(err, "Writing checkpoint encountered") } + klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount) } // GetWatcherHandler returns the plugin handler diff --git a/pkg/kubelet/cm/topologymanager/topology_manager.go b/pkg/kubelet/cm/topologymanager/topology_manager.go index 567736e82d3..35f3e3e4715 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager.go @@ -209,7 +209,7 @@ func (m *manager) RemoveContainer(containerID string) error { } func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { - klog.InfoS("Topology Admit Handler") + klog.InfoS("Topology Admit Handler", "podUID", attrs.Pod.UID, "podNamespace", attrs.Pod.Namespace, "podName", attrs.Pod.Name) metrics.TopologyManagerAdmissionRequestsTotal.Inc() startTime := time.Now()