diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 02cb34ddcdc..78a31b19c63 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -51,7 +51,6 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" - "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/dra" @@ -563,8 +562,9 @@ func (cm *containerManagerImpl) Start(node *v1.Node, localStorageCapacityIsolation bool) error { ctx := context.Background() + containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Initialize CPU manager - containerMap := buildContainerMapFromRuntime(ctx, runtimeService) err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) if err != nil { return fmt.Errorf("start cpu manager error: %v", err) @@ -572,7 +572,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, // Initialize memory manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { - containerMap := buildContainerMapFromRuntime(ctx, runtimeService) + containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) if err != nil { return fmt.Errorf("start memory manager error: %v", err) @@ -636,7 +636,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } // Starts device manager. - if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { + if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil { return err } @@ -699,26 +699,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { } } -func buildContainerMapFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) containermap.ContainerMap { - podSandboxMap := make(map[string]string) - podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil) - for _, p := range podSandboxList { - podSandboxMap[p.Id] = p.Metadata.Uid - } - - containerMap := containermap.NewContainerMap() - containerList, _ := runtimeService.ListContainers(ctx, nil) - for _, c := range containerList { - if _, exists := podSandboxMap[c.PodSandboxId]; !exists { - klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id) - continue - } - containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id) - } - - return containerMap -} - func isProcessRunningInHost(pid int) (bool, error) { // Get init pid namespace. initPidNs, err := os.Readlink("/proc/1/ns/pid") diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index c26a0683769..f62944aafcd 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -23,6 +23,7 @@ limitations under the License. package cm import ( + "context" "fmt" "k8s.io/klog/v2" @@ -86,8 +87,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } } + ctx := context.Background() + containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Starts device manager. - if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { + if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil { return err } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 7499de4460f..fed9ba4bb89 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -36,6 +36,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -97,6 +98,15 @@ type ManagerImpl struct { // pendingAdmissionPod contain the pod during the admission phase pendingAdmissionPod *v1.Pod + + // containerMap provides a mapping from (pod, container) -> containerID + // for all containers in a pod. Used to detect pods running across a restart + containerMap containermap.ContainerMap + + // containerRunningSet identifies which container among those present in `containerMap` + // was reported running by the container runtime when `containerMap` was computed. + // Used to detect pods running across a restart + containerRunningSet sets.String } type endpointInfo struct { @@ -277,11 +287,13 @@ func (m *ManagerImpl) checkpointFile() string { // Start starts the Device Plugin Manager and start initialization of // podDevices and allocatedDevices information from checkpointed state and // starts device plugin registration service. -func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { +func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error { klog.V(2).InfoS("Starting Device Plugin manager") m.activePods = activePods m.sourcesReady = sourcesReady + m.containerMap = initialContainers + m.containerRunningSet = initialContainerRunningSet // Loads in allocatedDevices information from disk. err := m.readCheckpoint() @@ -545,10 +557,31 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi } } + // We have 3 major flows to handle: + // 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large. + // 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running. + // 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices. + // note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies. + + // First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready. + // Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported + // running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be + // because it already has access to all the required devices, so we got nothing to do and we can bail out. + if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) { + klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) + return nil, nil + } + + // We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow). klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) healthyDevices, hasRegistered := m.healthyDevices[resource] - // Check if resource registered with devicemanager + // The following checks are expected to fail only happen on scenario 3 (node reboot). + // The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created, + // has registered and reported back to kubelet the devices. + // This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices. + // Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595 + // Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks. if !hasRegistered { return nil, fmt.Errorf("cannot allocate unregistered device %s", resource) } @@ -563,7 +596,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource) } + // We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path. + // We cover container restart on kubelet steady state with the same flow. if needed == 0 { + klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) // No change, no work. return nil, nil } @@ -1040,3 +1076,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) { m.pendingAdmissionPod = pod } + +func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool { + cntID, err := m.containerMap.GetContainerID(podUID, cntName) + if err != nil { + klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err) + return false + } + + // note that if container runtime is down when kubelet restarts, this set will be empty, + // so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again. + // This scenario should however be rare enough. + if !m.containerRunningSet.Has(cntID) { + klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID) + return false + } + + // Once we make it here we know we have a running container. + klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID) + return true +} diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 1ac9cb55674..6601a1e7b85 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -41,6 +41,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -284,7 +285,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor return []*v1.Pod{} } - err = w.Start(activePods, &sourcesReadyStub{}) + // test steady state, initialization where sourcesReady, containerMap and containerRunningSet + // are relevant will be tested with a different flow + err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString()) require.NoError(t, err) return w, updateChan @@ -1012,6 +1015,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) { unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: newPodDevices(), + activePods: func() []*v1.Pod { return []*v1.Pod{} }, + sourcesReady: &sourcesReadyStub{}, } testManager.podDevices.insert("pod1", "con1", resourceName1, diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index d508e8c9969..eb1c64548eb 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -20,6 +20,8 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -31,7 +33,7 @@ import ( // Manager manages all the Device Plugins running on a node. type Manager interface { // Start starts device plugin registration service. - Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error + Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error // Allocate configures and assigns devices to a container in a pod. From // the requested device resources, Allocate will communicate with the diff --git a/pkg/kubelet/cm/helpers.go b/pkg/kubelet/cm/helpers.go index dbbea4a8040..6be3e272307 100644 --- a/pkg/kubelet/cm/helpers.go +++ b/pkg/kubelet/cm/helpers.go @@ -17,7 +17,14 @@ limitations under the License. package cm import ( + "context" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + internalapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" ) @@ -44,3 +51,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res } return ret } + +func buildContainerMapAndRunningSetFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) { + podSandboxMap := make(map[string]string) + podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil) + for _, p := range podSandboxList { + podSandboxMap[p.Id] = p.Metadata.Uid + } + + runningSet := sets.NewString() + containerMap := containermap.NewContainerMap() + containerList, _ := runtimeService.ListContainers(ctx, nil) + for _, c := range containerList { + if _, exists := podSandboxMap[c.PodSandboxId]; !exists { + klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id) + continue + } + podUID := podSandboxMap[c.PodSandboxId] + containerMap.Add(podUID, c.Metadata.Name, c.Id) + if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING { + klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id) + runningSet.Insert(c.Id) + } + } + return containerMap, runningSet +}