diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index a0930eef98a..e4b17627ca9 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -17,6 +17,8 @@ limitations under the License. package allocation import ( + "path/filepath" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/types" @@ -29,7 +31,10 @@ import ( ) // podStatusManagerStateFile is the file name where status manager stores its state -const podStatusManagerStateFile = "pod_status_manager_state" +const ( + allocatedPodsStateFile = "allocated_pods_state" + actuatedPodsStateFile = "actuated_pods_state" +) // AllocationManager tracks pod resource allocations. type Manager interface { @@ -42,56 +47,70 @@ type Manager interface { UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) // SetPodAllocation checkpoints the resources allocated to a pod's containers. - SetPodAllocation(pod *v1.Pod) error + SetPodAllocation(allocatedPod *v1.Pod) error - // DeletePodAllocation removes any stored state for the given pod UID. - DeletePodAllocation(uid types.UID) + // SetActuatedResources records the actuated resources of the given container (or the entire + // pod, if actuatedContainer is nil). + SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error + + // GetActuatedResources returns the stored actuated resources for the container, and whether they exist. + GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) + + // DeletePod removes any stored state for the given pod UID. + DeletePod(uid types.UID) // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. RemoveOrphanedPods(remainingPods sets.Set[types.UID]) } type manager struct { - state state.State + allocated state.State + actuated state.State } func NewManager(checkpointDirectory string) Manager { - m := &manager{} + return &manager{ + allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile), + actuated: newStateImpl(checkpointDirectory, actuatedPodsStateFile), + } +} - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile) - if err != nil { - // This is a crictical, non-recoverable failure. - klog.ErrorS(err, "Failed to initialize allocation checkpoint manager") - panic(err) - } - m.state = stateImpl - } else { - m.state = state.NewNoopStateCheckpoint() +func newStateImpl(checkpointDirectory, checkpointName string) state.State { + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + return state.NewNoopStateCheckpoint() } - return m + stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, checkpointName) + if err != nil { + // This is a critical, non-recoverable failure. + klog.ErrorS(err, "Failed to initialize allocation checkpoint manager", + "checkpointPath", filepath.Join(checkpointDirectory, checkpointName)) + panic(err) + } + + return stateImpl } // NewInMemoryManager returns an allocation manager that doesn't persist state. // For testing purposes only! func NewInMemoryManager() Manager { return &manager{ - state: state.NewStateMemory(nil), + allocated: state.NewStateMemory(nil), + actuated: state.NewStateMemory(nil), } } // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values // If checkpoint manager has not been initialized, it returns nil, false func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { - return m.state.GetContainerResourceAllocation(podUID, containerName) + return m.allocated.GetContainerResourceAllocation(podUID, containerName) } // UpdatePodFromAllocation overwrites the pod spec with the allocation. // This function does a deep copy only if updates are needed. func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { // TODO(tallclair): This clones the whole cache, but we only need 1 pod. - allocs := m.state.GetPodResourceAllocation() + allocs := m.allocated.GetPodResourceAllocation() return updatePodFromAllocation(pod, allocs) } @@ -134,6 +153,10 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (* // SetPodAllocation checkpoints the resources allocated to a pod's containers func (m *manager) SetPodAllocation(pod *v1.Pod) error { + return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod)) +} + +func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements { podAlloc := make(map[string]v1.ResourceRequirements) for _, container := range pod.Spec.Containers { alloc := *container.Resources.DeepCopy() @@ -149,16 +172,35 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error { } } - return m.state.SetPodResourceAllocation(pod.UID, podAlloc) + return podAlloc } -func (m *manager) DeletePodAllocation(uid types.UID) { - if err := m.state.Delete(uid, ""); err != nil { +func (m *manager) DeletePod(uid types.UID) { + if err := m.allocated.Delete(uid, ""); err != nil { + // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. + klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) + } + + if err := m.actuated.Delete(uid, ""); err != nil { // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) } } func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { - m.state.RemoveOrphanedPods(remainingPods) + m.allocated.RemoveOrphanedPods(remainingPods) + m.actuated.RemoveOrphanedPods(remainingPods) +} + +func (m *manager) SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error { + if actuatedContainer == nil { + alloc := allocationFromPod(allocatedPod) + return m.actuated.SetPodResourceAllocation(allocatedPod.UID, alloc) + } + + return m.actuated.SetContainerResourceAllocation(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources) +} + +func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { + return m.actuated.GetContainerResourceAllocation(podUID, containerName) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e22f9b81209..8a0bfdb2873 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2722,7 +2722,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.RemovePod(pod) - kl.allocationManager.DeletePodAllocation(pod.UID) + kl.allocationManager.DeletePod(pod.UID) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror {