kubelet: devices: skip allocation for running pods

When kubelet initializes, runs admission for pods and possibly
allocated requested resources. We need to distinguish between
node reboot (no containers running) versus kubelet restart (containers
potentially running).

Running pods should always survive kubelet restart.
This means that device allocation on admission should not be attempted,
because if a container requires devices and is still running when kubelet
is restarting, that container already has devices allocated and working.

Thus, we need to properly detect this scenario in the allocation step
and handle it explicitely. We need to inform
the devicemanager about which pods are already running.

Note that if container runtime is down when kubelet restarts, the
approach implemented here won't work. In this scenario, so on kubelet
restart containers will again fail admission, hitting
https://github.com/kubernetes/kubernetes/issues/118559 again.
This scenario should however be pretty rare.

Signed-off-by: Francesco Romani <fromani@redhat.com>
This commit is contained in:
Francesco Romani 2023-06-09 16:49:58 +02:00
parent 988094878e
commit 3bcf4220ec
6 changed files with 108 additions and 29 deletions

View File

@ -51,7 +51,6 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/dra"
@ -563,8 +562,9 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
localStorageCapacityIsolation bool) error { localStorageCapacityIsolation bool) error {
ctx := context.Background() ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Initialize CPU manager // Initialize CPU manager
containerMap := buildContainerMapFromRuntime(ctx, runtimeService)
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil { if err != nil {
return fmt.Errorf("start cpu manager error: %v", err) return fmt.Errorf("start cpu manager error: %v", err)
@ -572,7 +572,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
// Initialize memory manager // Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap := buildContainerMapFromRuntime(ctx, runtimeService) containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil { if err != nil {
return fmt.Errorf("start memory manager error: %v", err) return fmt.Errorf("start memory manager error: %v", err)
@ -636,7 +636,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
} }
// Starts device manager. // Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err return err
} }
@ -699,26 +699,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
} }
} }
func buildContainerMapFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) containermap.ContainerMap {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(ctx, nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id)
}
return containerMap
}
func isProcessRunningInHost(pid int) (bool, error) { func isProcessRunningInHost(pid int) (bool, error) {
// Get init pid namespace. // Get init pid namespace.
initPidNs, err := os.Readlink("/proc/1/ns/pid") initPidNs, err := os.Readlink("/proc/1/ns/pid")

View File

@ -23,6 +23,7 @@ limitations under the License.
package cm package cm
import ( import (
"context"
"fmt" "fmt"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -86,8 +87,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
} }
} }
ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Starts device manager. // Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err return err
} }

View File

@ -36,6 +36,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -97,6 +98,15 @@ type ManagerImpl struct {
// pendingAdmissionPod contain the pod during the admission phase // pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod pendingAdmissionPod *v1.Pod
// containerMap provides a mapping from (pod, container) -> containerID
// for all containers in a pod. Used to detect pods running across a restart
containerMap containermap.ContainerMap
// containerRunningSet identifies which container among those present in `containerMap`
// was reported running by the container runtime when `containerMap` was computed.
// Used to detect pods running across a restart
containerRunningSet sets.String
} }
type endpointInfo struct { type endpointInfo struct {
@ -277,11 +287,13 @@ func (m *ManagerImpl) checkpointFile() string {
// Start starts the Device Plugin Manager and start initialization of // Start starts the Device Plugin Manager and start initialization of
// podDevices and allocatedDevices information from checkpointed state and // podDevices and allocatedDevices information from checkpointed state and
// starts device plugin registration service. // starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
klog.V(2).InfoS("Starting Device Plugin manager") klog.V(2).InfoS("Starting Device Plugin manager")
m.activePods = activePods m.activePods = activePods
m.sourcesReady = sourcesReady m.sourcesReady = sourcesReady
m.containerMap = initialContainers
m.containerRunningSet = initialContainerRunningSet
// Loads in allocatedDevices information from disk. // Loads in allocatedDevices information from disk.
err := m.readCheckpoint() err := m.readCheckpoint()
@ -545,10 +557,31 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
} }
} }
// We have 3 major flows to handle:
// 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large.
// 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running.
// 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices.
// note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies.
// First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready.
// Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported
// running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be
// because it already has access to all the required devices, so we got nothing to do and we can bail out.
if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) {
klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
return nil, nil
}
// We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow).
klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName) klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
healthyDevices, hasRegistered := m.healthyDevices[resource] healthyDevices, hasRegistered := m.healthyDevices[resource]
// Check if resource registered with devicemanager // The following checks are expected to fail only happen on scenario 3 (node reboot).
// The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created,
// has registered and reported back to kubelet the devices.
// This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices.
// Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595
// Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks.
if !hasRegistered { if !hasRegistered {
return nil, fmt.Errorf("cannot allocate unregistered device %s", resource) return nil, fmt.Errorf("cannot allocate unregistered device %s", resource)
} }
@ -563,7 +596,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource) return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource)
} }
// We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path.
// We cover container restart on kubelet steady state with the same flow.
if needed == 0 { if needed == 0 {
klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
// No change, no work. // No change, no work.
return nil, nil return nil, nil
} }
@ -1040,3 +1076,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
m.pendingAdmissionPod = pod m.pendingAdmissionPod = pod
} }
func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
cntID, err := m.containerMap.GetContainerID(podUID, cntName)
if err != nil {
klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err)
return false
}
// note that if container runtime is down when kubelet restarts, this set will be empty,
// so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again.
// This scenario should however be rare enough.
if !m.containerRunningSet.Has(cntID) {
klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return false
}
// Once we make it here we know we have a running container.
klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return true
}

View File

@ -41,6 +41,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -284,7 +285,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
return []*v1.Pod{} return []*v1.Pod{}
} }
err = w.Start(activePods, &sourcesReadyStub{}) // test steady state, initialization where sourcesReady, containerMap and containerRunningSet
// are relevant will be tested with a different flow
err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString())
require.NoError(t, err) require.NoError(t, err)
return w, updateChan return w, updateChan
@ -1012,6 +1015,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) {
unhealthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
podDevices: newPodDevices(), podDevices: newPodDevices(),
activePods: func() []*v1.Pod { return []*v1.Pod{} },
sourcesReady: &sourcesReadyStub{},
} }
testManager.podDevices.insert("pod1", "con1", resourceName1, testManager.podDevices.insert("pod1", "con1", resourceName1,

View File

@ -20,6 +20,8 @@ import (
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -31,7 +33,7 @@ import (
// Manager manages all the Device Plugins running on a node. // Manager manages all the Device Plugins running on a node.
type Manager interface { type Manager interface {
// Start starts device plugin registration service. // Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error
// Allocate configures and assigns devices to a container in a pod. From // Allocate configures and assigns devices to a container in a pod. From
// the requested device resources, Allocate will communicate with the // the requested device resources, Allocate will communicate with the

View File

@ -17,7 +17,14 @@ limitations under the License.
package cm package cm
import ( import (
"context"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
) )
@ -44,3 +51,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res
} }
return ret return ret
} }
func buildContainerMapAndRunningSetFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}
runningSet := sets.NewString()
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(ctx, nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
podUID := podSandboxMap[c.PodSandboxId]
containerMap.Add(podUID, c.Metadata.Name, c.Id)
if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING {
klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id)
runningSet.Insert(c.Id)
}
}
return containerMap, runningSet
}