Fix nil pointer access panic in kubelet from uninitialized pod allocation checkpoint manager in standalone kubelet scenario

This commit is contained in:
vinay kulkarni 2023-03-04 08:07:40 +00:00
parent 8da8bb41bc
commit 12435b26fc
3 changed files with 86 additions and 42 deletions

View File

@ -2105,21 +2105,30 @@ func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, strin
// TODO: out of resource eviction should have a pod admitter call-out // TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods} attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// If pod resource allocation checkpoint manager (checkpointState) is nil, it is likely because
// kubelet is bootstrapping kube-system pods on master node. In this scenario, pod resource resize
// is neither expected nor can be handled. Fall back to regular 'canAdmitPod' processing.
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate if we can toss out all this checkpointing
// code and instead rely on ResourceAllocation / Resize values persisted in PodStatus. (Ref: KEP 2527)
// Use allocated resources values from checkpoint store (source of truth) to determine fit // Use allocated resources values from checkpoint store (source of truth) to determine fit
otherPods := make([]*v1.Pod, 0, len(pods))
checkpointState := kl.statusManager.State() checkpointState := kl.statusManager.State()
for _, p := range pods { if checkpointState != nil {
op := p.DeepCopy() otherPods := make([]*v1.Pod, 0, len(pods))
for _, c := range op.Spec.Containers { for _, p := range pods {
resourcesAllocated, found := checkpointState.GetContainerResourceAllocation(string(p.UID), c.Name) op := p.DeepCopy()
if c.Resources.Requests != nil && found { for _, c := range op.Spec.Containers {
c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU] resourcesAllocated, found := checkpointState.GetContainerResourceAllocation(string(p.UID), c.Name)
c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory] if c.Resources.Requests != nil && found {
c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU]
c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory]
}
} }
otherPods = append(otherPods, op)
} }
otherPods = append(otherPods, op) attrs.OtherPods = otherPods
} else {
klog.ErrorS(nil, "pod resource allocation checkpoint manager is not initialized.")
} }
attrs.OtherPods = otherPods
} }
for _, podAdmitHandler := range kl.admitHandlers { for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit { if result := podAdmitHandler.Admit(attrs); !result.Admit {
@ -2404,28 +2413,39 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
activePods := kl.filterOutInactivePods(existingPods) activePods := kl.filterOutInactivePods(existingPods)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// To handle kubelet restarts, test pod admissibility using ResourcesAllocated values // If pod resource allocation checkpoint manager (checkpointState) is nil, it is likely because
// (for cpu & memory) from checkpoint store. If found, that is the source of truth. // kubelet is bootstrapping kube-system pods on master node. In this scenario, pod resource resize
// is neither expected nor can be handled. Fall back to regular 'canAdmitPod' processing.
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate if we can toss out all this checkpointing
// code and instead rely on ResourceAllocation / Resize values persisted in PodStatus. (Ref: KEP 2527)
checkpointState := kl.statusManager.State() checkpointState := kl.statusManager.State()
podCopy := pod.DeepCopy() if checkpointState != nil {
for _, c := range podCopy.Spec.Containers { // To handle kubelet restarts, test pod admissibility using ResourcesAllocated values
resourcesAllocated, found := checkpointState.GetContainerResourceAllocation(string(pod.UID), c.Name) // (for cpu & memory) from checkpoint store. If found, that is the source of truth.
if c.Resources.Requests != nil && found { podCopy := pod.DeepCopy()
c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU] for _, c := range podCopy.Spec.Containers {
c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory] resourcesAllocated, found := checkpointState.GetContainerResourceAllocation(string(pod.UID), c.Name)
if c.Resources.Requests != nil && found {
c.Resources.Requests[v1.ResourceCPU] = resourcesAllocated[v1.ResourceCPU]
c.Resources.Requests[v1.ResourceMemory] = resourcesAllocated[v1.ResourceMemory]
}
}
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok {
kl.rejectPod(pod, reason, message)
continue
}
// For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(podCopy); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
}
} else {
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
} }
}
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok {
kl.rejectPod(pod, reason, message)
continue
}
// For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(podCopy); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
} }
} else { } else {
// Check if we can admit the pod; if not, reject it. // Check if we can admit the pod; if not, reject it.

View File

@ -1473,8 +1473,12 @@ func (kl *Kubelet) determinePodResizeStatus(pod *v1.Pod, podStatus *v1.PodStatus
} }
} else { } else {
checkpointState := kl.statusManager.State() checkpointState := kl.statusManager.State()
if resizeStatus, found := checkpointState.GetPodResizeStatus(string(pod.UID)); found { if checkpointState != nil {
podResizeStatus = resizeStatus if resizeStatus, found := checkpointState.GetPodResizeStatus(string(pod.UID)); found {
podResizeStatus = resizeStatus
}
} else {
klog.ErrorS(nil, "pod resource allocation checkpoint manager is not initialized.")
} }
} }
return podResizeStatus return podResizeStatus
@ -1770,7 +1774,11 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
// ResourcesAllocated values come from checkpoint. It is the source-of-truth. // ResourcesAllocated values come from checkpoint. It is the source-of-truth.
found := false found := false
checkpointState := kl.statusManager.State() checkpointState := kl.statusManager.State()
status.ResourcesAllocated, found = checkpointState.GetContainerResourceAllocation(string(pod.UID), cName) if checkpointState != nil {
status.ResourcesAllocated, found = checkpointState.GetContainerResourceAllocation(string(pod.UID), cName)
} else {
klog.ErrorS(nil, "pod resource allocation checkpoint manager is not initialized.")
}
if !(container.Resources.Requests == nil && container.Resources.Limits == nil) && !found { if !(container.Resources.Requests == nil && container.Resources.Limits == nil) && !found {
// Log error and fallback to ResourcesAllocated in oldStatus if it exists // Log error and fallback to ResourcesAllocated in oldStatus if it exists
klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName) klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName)

View File

@ -183,6 +183,17 @@ func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
} }
func (m *manager) Start() { func (m *manager) Start() {
// Create pod allocation checkpoint manager even if we don't have a client to allow local get/set of ResourcesAllocated & Resize
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
stateImpl, err := state.NewStateCheckpoint(m.stateFileDirectory, podStatusManagerStateFile)
if err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Check if this error should be a critical (stop kubelet) failure
klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file")
return
}
m.state = stateImpl
}
// Don't start the status manager if we don't have a client. This will happen // Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods // on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components. // of the master components.
@ -191,15 +202,6 @@ func (m *manager) Start() {
return return
} }
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
stateImpl, err := state.NewStateCheckpoint(m.stateFileDirectory, podStatusManagerStateFile)
if err != nil {
klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file")
return
}
m.state = stateImpl
}
klog.InfoS("Starting to sync pod status with apiserver") klog.InfoS("Starting to sync pod status with apiserver")
//nolint:staticcheck // SA1015 Ticker can leak since this is only called once and doesn't handle termination. //nolint:staticcheck // SA1015 Ticker can leak since this is only called once and doesn't handle termination.
@ -234,6 +236,9 @@ func (m *manager) State() state.Reader {
// SetPodAllocation checkpoints the resources allocated to a pod's containers // SetPodAllocation checkpoints the resources allocated to a pod's containers
func (m *manager) SetPodAllocation(pod *v1.Pod) error { func (m *manager) SetPodAllocation(pod *v1.Pod) error {
if m.state == nil {
return fmt.Errorf("pod allocation checkpoint manager is not initialized")
}
m.podStatusesLock.RLock() m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock() defer m.podStatusesLock.RUnlock()
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
@ -250,6 +255,9 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
// SetPodResizeStatus checkpoints the last resizing decision for the pod. // SetPodResizeStatus checkpoints the last resizing decision for the pod.
func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error {
if m.state == nil {
return fmt.Errorf("pod allocation checkpoint manager is not initialized")
}
m.podStatusesLock.RLock() m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock() defer m.podStatusesLock.RUnlock()
return m.state.SetPodResizeStatus(string(podUID), resizeStatus) return m.state.SetPodResizeStatus(string(podUID), resizeStatus)
@ -672,6 +680,10 @@ func (m *manager) deletePodStatus(uid types.UID) {
delete(m.podStatuses, uid) delete(m.podStatuses, uid)
m.podStartupLatencyHelper.DeletePodStartupState(uid) m.podStartupLatencyHelper.DeletePodStartupState(uid)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if m.state == nil {
klog.ErrorS(nil, "pod allocation checkpoint manager is not initialized")
return
}
m.state.Delete(string(uid), "") m.state.Delete(string(uid), "")
} }
} }
@ -685,6 +697,10 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
klog.V(5).InfoS("Removing pod from status map.", "podUID", key) klog.V(5).InfoS("Removing pod from status map.", "podUID", key)
delete(m.podStatuses, key) delete(m.podStatuses, key)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if m.state == nil {
klog.ErrorS(nil, "pod allocation checkpoint manager is not initialized")
continue
}
m.state.Delete(string(key), "") m.state.Delete(string(key), "")
} }
} }