From 3bcf4220ece998d626ae670f911f8a1a1bb31507 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Fri, 9 Jun 2023 16:49:58 +0200 Subject: [PATCH] kubelet: devices: skip allocation for running pods When kubelet initializes, runs admission for pods and possibly allocated requested resources. We need to distinguish between node reboot (no containers running) versus kubelet restart (containers potentially running). Running pods should always survive kubelet restart. This means that device allocation on admission should not be attempted, because if a container requires devices and is still running when kubelet is restarting, that container already has devices allocated and working. Thus, we need to properly detect this scenario in the allocation step and handle it explicitely. We need to inform the devicemanager about which pods are already running. Note that if container runtime is down when kubelet restarts, the approach implemented here won't work. In this scenario, so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again. This scenario should however be pretty rare. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/container_manager_linux.go | 28 ++------- pkg/kubelet/cm/container_manager_windows.go | 6 +- pkg/kubelet/cm/devicemanager/manager.go | 60 +++++++++++++++++++- pkg/kubelet/cm/devicemanager/manager_test.go | 7 ++- pkg/kubelet/cm/devicemanager/types.go | 4 +- pkg/kubelet/cm/helpers.go | 32 +++++++++++ 6 files changed, 108 insertions(+), 29 deletions(-) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 02cb34ddcdc..78a31b19c63 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -51,7 +51,6 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" - "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/dra" @@ -563,8 +562,9 @@ func (cm *containerManagerImpl) Start(node *v1.Node, localStorageCapacityIsolation bool) error { ctx := context.Background() + containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Initialize CPU manager - containerMap := buildContainerMapFromRuntime(ctx, runtimeService) err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) if err != nil { return fmt.Errorf("start cpu manager error: %v", err) @@ -572,7 +572,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, // Initialize memory manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { - containerMap := buildContainerMapFromRuntime(ctx, runtimeService) + containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) if err != nil { return fmt.Errorf("start memory manager error: %v", err) @@ -636,7 +636,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } // Starts device manager. - if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { + if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil { return err } @@ -699,26 +699,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { } } -func buildContainerMapFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) containermap.ContainerMap { - podSandboxMap := make(map[string]string) - podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil) - for _, p := range podSandboxList { - podSandboxMap[p.Id] = p.Metadata.Uid - } - - containerMap := containermap.NewContainerMap() - containerList, _ := runtimeService.ListContainers(ctx, nil) - for _, c := range containerList { - if _, exists := podSandboxMap[c.PodSandboxId]; !exists { - klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id) - continue - } - containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id) - } - - return containerMap -} - func isProcessRunningInHost(pid int) (bool, error) { // Get init pid namespace. initPidNs, err := os.Readlink("/proc/1/ns/pid") diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index c26a0683769..f62944aafcd 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -23,6 +23,7 @@ limitations under the License. package cm import ( + "context" "fmt" "k8s.io/klog/v2" @@ -86,8 +87,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } } + ctx := context.Background() + containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Starts device manager. - if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { + if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil { return err } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 7499de4460f..fed9ba4bb89 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -36,6 +36,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -97,6 +98,15 @@ type ManagerImpl struct { // pendingAdmissionPod contain the pod during the admission phase pendingAdmissionPod *v1.Pod + + // containerMap provides a mapping from (pod, container) -> containerID + // for all containers in a pod. Used to detect pods running across a restart + containerMap containermap.ContainerMap + + // containerRunningSet identifies which container among those present in `containerMap` + // was reported running by the container runtime when `containerMap` was computed. + // Used to detect pods running across a restart + containerRunningSet sets.String } type endpointInfo struct { @@ -277,11 +287,13 @@ func (m *ManagerImpl) checkpointFile() string { // Start starts the Device Plugin Manager and start initialization of // podDevices and allocatedDevices information from checkpointed state and // starts device plugin registration service. -func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { +func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error { klog.V(2).InfoS("Starting Device Plugin manager") m.activePods = activePods m.sourcesReady = sourcesReady + m.containerMap = initialContainers + m.containerRunningSet = initialContainerRunningSet // Loads in allocatedDevices information from disk. err := m.readCheckpoint() @@ -545,10 +557,31 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi } } + // We have 3 major flows to handle: + // 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large. + // 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running. + // 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices. + // note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies. + + // First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready. + // Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported + // running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be + // because it already has access to all the required devices, so we got nothing to do and we can bail out. + if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) { + klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) + return nil, nil + } + + // We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow). klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) healthyDevices, hasRegistered := m.healthyDevices[resource] - // Check if resource registered with devicemanager + // The following checks are expected to fail only happen on scenario 3 (node reboot). + // The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created, + // has registered and reported back to kubelet the devices. + // This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices. + // Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595 + // Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks. if !hasRegistered { return nil, fmt.Errorf("cannot allocate unregistered device %s", resource) } @@ -563,7 +596,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource) } + // We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path. + // We cover container restart on kubelet steady state with the same flow. if needed == 0 { + klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) // No change, no work. return nil, nil } @@ -1040,3 +1076,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) { m.pendingAdmissionPod = pod } + +func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool { + cntID, err := m.containerMap.GetContainerID(podUID, cntName) + if err != nil { + klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err) + return false + } + + // note that if container runtime is down when kubelet restarts, this set will be empty, + // so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again. + // This scenario should however be rare enough. + if !m.containerRunningSet.Has(cntID) { + klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID) + return false + } + + // Once we make it here we know we have a running container. + klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID) + return true +} diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 1ac9cb55674..6601a1e7b85 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -41,6 +41,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -284,7 +285,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor return []*v1.Pod{} } - err = w.Start(activePods, &sourcesReadyStub{}) + // test steady state, initialization where sourcesReady, containerMap and containerRunningSet + // are relevant will be tested with a different flow + err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString()) require.NoError(t, err) return w, updateChan @@ -1012,6 +1015,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) { unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: newPodDevices(), + activePods: func() []*v1.Pod { return []*v1.Pod{} }, + sourcesReady: &sourcesReadyStub{}, } testManager.podDevices.insert("pod1", "con1", resourceName1, diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index d508e8c9969..eb1c64548eb 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -20,6 +20,8 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -31,7 +33,7 @@ import ( // Manager manages all the Device Plugins running on a node. type Manager interface { // Start starts device plugin registration service. - Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error + Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error // Allocate configures and assigns devices to a container in a pod. From // the requested device resources, Allocate will communicate with the diff --git a/pkg/kubelet/cm/helpers.go b/pkg/kubelet/cm/helpers.go index dbbea4a8040..6be3e272307 100644 --- a/pkg/kubelet/cm/helpers.go +++ b/pkg/kubelet/cm/helpers.go @@ -17,7 +17,14 @@ limitations under the License. package cm import ( + "context" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + internalapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" ) @@ -44,3 +51,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res } return ret } + +func buildContainerMapAndRunningSetFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) { + podSandboxMap := make(map[string]string) + podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil) + for _, p := range podSandboxList { + podSandboxMap[p.Id] = p.Metadata.Uid + } + + runningSet := sets.NewString() + containerMap := containermap.NewContainerMap() + containerList, _ := runtimeService.ListContainers(ctx, nil) + for _, c := range containerList { + if _, exists := podSandboxMap[c.PodSandboxId]; !exists { + klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id) + continue + } + podUID := podSandboxMap[c.PodSandboxId] + containerMap.Add(podUID, c.Metadata.Name, c.Id) + if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING { + klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id) + runningSet.Insert(c.Id) + } + } + return containerMap, runningSet +}