From 660bd6b42d159d81ebdf0772388983399a62f28e Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Wed, 5 Mar 2025 12:57:21 -0800 Subject: [PATCH 1/3] Track actuated resources in the allocation manager --- pkg/kubelet/allocation/allocation_manager.go | 90 ++++++++++++++------ pkg/kubelet/kubelet.go | 2 +- 2 files changed, 67 insertions(+), 25 deletions(-) diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index a0930eef98a..e4b17627ca9 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -17,6 +17,8 @@ limitations under the License. package allocation import ( + "path/filepath" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/types" @@ -29,7 +31,10 @@ import ( ) // podStatusManagerStateFile is the file name where status manager stores its state -const podStatusManagerStateFile = "pod_status_manager_state" +const ( + allocatedPodsStateFile = "allocated_pods_state" + actuatedPodsStateFile = "actuated_pods_state" +) // AllocationManager tracks pod resource allocations. type Manager interface { @@ -42,56 +47,70 @@ type Manager interface { UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) // SetPodAllocation checkpoints the resources allocated to a pod's containers. - SetPodAllocation(pod *v1.Pod) error + SetPodAllocation(allocatedPod *v1.Pod) error - // DeletePodAllocation removes any stored state for the given pod UID. - DeletePodAllocation(uid types.UID) + // SetActuatedResources records the actuated resources of the given container (or the entire + // pod, if actuatedContainer is nil). + SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error + + // GetActuatedResources returns the stored actuated resources for the container, and whether they exist. + GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) + + // DeletePod removes any stored state for the given pod UID. + DeletePod(uid types.UID) // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. RemoveOrphanedPods(remainingPods sets.Set[types.UID]) } type manager struct { - state state.State + allocated state.State + actuated state.State } func NewManager(checkpointDirectory string) Manager { - m := &manager{} + return &manager{ + allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile), + actuated: newStateImpl(checkpointDirectory, actuatedPodsStateFile), + } +} - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile) - if err != nil { - // This is a crictical, non-recoverable failure. - klog.ErrorS(err, "Failed to initialize allocation checkpoint manager") - panic(err) - } - m.state = stateImpl - } else { - m.state = state.NewNoopStateCheckpoint() +func newStateImpl(checkpointDirectory, checkpointName string) state.State { + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + return state.NewNoopStateCheckpoint() } - return m + stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, checkpointName) + if err != nil { + // This is a critical, non-recoverable failure. + klog.ErrorS(err, "Failed to initialize allocation checkpoint manager", + "checkpointPath", filepath.Join(checkpointDirectory, checkpointName)) + panic(err) + } + + return stateImpl } // NewInMemoryManager returns an allocation manager that doesn't persist state. // For testing purposes only! func NewInMemoryManager() Manager { return &manager{ - state: state.NewStateMemory(nil), + allocated: state.NewStateMemory(nil), + actuated: state.NewStateMemory(nil), } } // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values // If checkpoint manager has not been initialized, it returns nil, false func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { - return m.state.GetContainerResourceAllocation(podUID, containerName) + return m.allocated.GetContainerResourceAllocation(podUID, containerName) } // UpdatePodFromAllocation overwrites the pod spec with the allocation. // This function does a deep copy only if updates are needed. func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { // TODO(tallclair): This clones the whole cache, but we only need 1 pod. - allocs := m.state.GetPodResourceAllocation() + allocs := m.allocated.GetPodResourceAllocation() return updatePodFromAllocation(pod, allocs) } @@ -134,6 +153,10 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (* // SetPodAllocation checkpoints the resources allocated to a pod's containers func (m *manager) SetPodAllocation(pod *v1.Pod) error { + return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod)) +} + +func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements { podAlloc := make(map[string]v1.ResourceRequirements) for _, container := range pod.Spec.Containers { alloc := *container.Resources.DeepCopy() @@ -149,16 +172,35 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error { } } - return m.state.SetPodResourceAllocation(pod.UID, podAlloc) + return podAlloc } -func (m *manager) DeletePodAllocation(uid types.UID) { - if err := m.state.Delete(uid, ""); err != nil { +func (m *manager) DeletePod(uid types.UID) { + if err := m.allocated.Delete(uid, ""); err != nil { + // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. + klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) + } + + if err := m.actuated.Delete(uid, ""); err != nil { // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) } } func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { - m.state.RemoveOrphanedPods(remainingPods) + m.allocated.RemoveOrphanedPods(remainingPods) + m.actuated.RemoveOrphanedPods(remainingPods) +} + +func (m *manager) SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error { + if actuatedContainer == nil { + alloc := allocationFromPod(allocatedPod) + return m.actuated.SetPodResourceAllocation(allocatedPod.UID, alloc) + } + + return m.actuated.SetContainerResourceAllocation(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources) +} + +func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { + return m.actuated.GetContainerResourceAllocation(podUID, containerName) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e22f9b81209..8a0bfdb2873 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2722,7 +2722,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.RemovePod(pod) - kl.allocationManager.DeletePodAllocation(pod.UID) + kl.allocationManager.DeletePod(pod.UID) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { From d4444dd59862757a2a85a1f71d83350631a6a36e Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Wed, 5 Mar 2025 12:58:24 -0800 Subject: [PATCH 2/3] Use actuated resources to determine resize status --- pkg/kubelet/kubelet.go | 48 ++- pkg/kubelet/kubelet_pods.go | 81 ----- pkg/kubelet/kubelet_pods_test.go | 267 --------------- pkg/kubelet/kubelet_test.go | 303 +++++++++++++++--- .../kuberuntime/fake_kuberuntime_manager.go | 2 + .../kuberuntime/kuberuntime_container.go | 10 +- .../kuberuntime/kuberuntime_manager.go | 105 ++---- .../kuberuntime/kuberuntime_manager_test.go | 160 ++++----- 8 files changed, 409 insertions(+), 567 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8a0bfdb2873..3004a8d03e6 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -740,6 +740,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps.ContainerManager, klet.containerLogManager, klet.runtimeClassManager, + klet.allocationManager, seccompDefault, kubeCfg.MemorySwap.SwapBehavior, kubeDeps.ContainerManager.GetNodeAllocatableAbsolute, @@ -2886,8 +2887,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine if !updated { // Desired resources == allocated resources. Check whether a resize is in progress. - resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus) - if resizeInProgress { + if kl.isPodResizeInProgress(allocatedPod, podStatus) { // If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted. kl.statusManager.SetPodResizeStatus(pod.UID, v1.PodResizeStatusInProgress) } else { @@ -2928,11 +2928,11 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine } allocatedPod = pod - // Special case when the updated allocation matches the actual resources. This can occur + // Special case when the updated allocation matches the actuated resources. This can occur // when reverting a resize that hasn't been actuated, or when making an equivalent change // (such as CPU requests below MinShares). This is an optimization to clear the resize // status immediately, rather than waiting for the next SyncPod iteration. - if allocatedResourcesMatchStatus(allocatedPod, podStatus) { + if !kl.isPodResizeInProgress(allocatedPod, podStatus) { // In this case, consider the resize complete. kl.statusManager.SetPodResizeStatus(pod.UID, "") return allocatedPod, nil @@ -2952,6 +2952,46 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine return allocatedPod, nil } +// isPodResizingInProgress checks whether the actuated resizable resources differ from the allocated resources +// for any running containers. Specifically, the following differences are ignored: +// - Non-resizable containers: non-restartable init containers, ephemeral containers +// - Non-resizable resources: only CPU & memory are resizable +// - Non-actuated resources: memory requests are not actuated +// - Non-running containers: they will be sized correctly when (re)started +func (kl *Kubelet) isPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool { + return !podutil.VisitContainers(&allocatedPod.Spec, podutil.InitContainers|podutil.Containers, + func(allocatedContainer *v1.Container, containerType podutil.ContainerType) (shouldContinue bool) { + if !isResizableContainer(allocatedContainer, containerType) { + return true + } + + containerStatus := podStatus.FindContainerStatusByName(allocatedContainer.Name) + if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning { + // If the container isn't running, it doesn't need to be resized. + return true + } + + actuatedResources, _ := kl.allocationManager.GetActuatedResources(allocatedPod.UID, allocatedContainer.Name) + allocatedResources := allocatedContainer.Resources + + // Memory requests are excluded since they don't need to be actuated. + return allocatedResources.Requests[v1.ResourceCPU].Equal(actuatedResources.Requests[v1.ResourceCPU]) && + allocatedResources.Limits[v1.ResourceCPU].Equal(actuatedResources.Limits[v1.ResourceCPU]) && + allocatedResources.Limits[v1.ResourceMemory].Equal(actuatedResources.Limits[v1.ResourceMemory]) + }) +} + +func isResizableContainer(container *v1.Container, containerType podutil.ContainerType) bool { + switch containerType { + case podutil.InitContainers: + return podutil.IsRestartableInitContainer(container) + case podutil.Containers: + return true + default: + return false + } +} + // LatestLoopEntryTime returns the last time in the sync loop monitor. func (kl *Kubelet) LatestLoopEntryTime() time.Time { val := kl.syncLoopMonitor.Load() diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 52eba02b334..429507c9159 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -60,7 +60,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/kubelet/util/format" utilfs "k8s.io/kubernetes/pkg/util/filesystem" utilpod "k8s.io/kubernetes/pkg/util/pod" volumeutil "k8s.io/kubernetes/pkg/volume/util" @@ -1746,86 +1745,6 @@ func (kl *Kubelet) determinePodResizeStatus(allocatedPod *v1.Pod, podStatus *kub return resizeStatus } -// allocatedResourcesMatchStatus tests whether the resizeable resources in the pod spec match the -// resources reported in the status. -func allocatedResourcesMatchStatus(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool { - for _, c := range allocatedPod.Spec.Containers { - if !allocatedContainerResourcesMatchStatus(allocatedPod, &c, podStatus) { - return false - } - } - if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) { - for _, c := range allocatedPod.Spec.InitContainers { - if podutil.IsRestartableInitContainer(&c) && !allocatedContainerResourcesMatchStatus(allocatedPod, &c, podStatus) { - return false - } - } - } - return true -} - -// allocatedContainerResourcesMatchStatus returns true if the container resources matches with the container statuses resources. -func allocatedContainerResourcesMatchStatus(allocatedPod *v1.Pod, c *v1.Container, podStatus *kubecontainer.PodStatus) bool { - if cs := podStatus.FindContainerStatusByName(c.Name); cs != nil { - if cs.State != kubecontainer.ContainerStateRunning { - // If the container isn't running, it isn't resizing. - return true - } - - cpuReq, hasCPUReq := c.Resources.Requests[v1.ResourceCPU] - cpuLim, hasCPULim := c.Resources.Limits[v1.ResourceCPU] - memLim, hasMemLim := c.Resources.Limits[v1.ResourceMemory] - - if cs.Resources == nil { - if hasCPUReq || hasCPULim || hasMemLim { - // Container status is missing Resources information, but the container does - // have resizable resources configured. - klog.ErrorS(nil, "Missing runtime resources information for resizing container", - "pod", format.Pod(allocatedPod), "container", c.Name) - return false // We don't want to clear resize status with insufficient information. - } else { - // No resizable resources configured; this might be ok. - return true - } - } - - // Only compare resizeable resources, and only compare resources that are explicitly configured. - if hasCPUReq { - if cs.Resources.CPURequest == nil { - if !cpuReq.IsZero() { - return false - } - } else if !cpuReq.Equal(*cs.Resources.CPURequest) && - (cpuReq.MilliValue() > cm.MinShares || cs.Resources.CPURequest.MilliValue() > cm.MinShares) { - // If both allocated & status CPU requests are at or below MinShares then they are considered equal. - return false - } - } - if hasCPULim { - if cs.Resources.CPULimit == nil { - if !cpuLim.IsZero() { - return false - } - } else if !cpuLim.Equal(*cs.Resources.CPULimit) && - (cpuLim.MilliValue() > cm.MinMilliCPULimit || cs.Resources.CPULimit.MilliValue() > cm.MinMilliCPULimit) { - // If both allocated & status CPU limits are at or below the minimum limit, then they are considered equal. - return false - } - } - if hasMemLim { - if cs.Resources.MemoryLimit == nil { - if !memLim.IsZero() { - return false - } - } else if !memLim.Equal(*cs.Resources.MemoryLimit) { - return false - } - } - } - - return true -} - // generateAPIPodStatus creates the final API pod status for a pod, given the // internal pod status. This method should only be called from within sync*Pod methods. func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podIsTerminal bool) v1.PodStatus { diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 59940865cc4..e4d3d20093f 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -6683,270 +6683,3 @@ func TestResolveRecursiveReadOnly(t *testing.T) { } } } - -func TestAllocatedResourcesMatchStatus(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SidecarContainers, true) - containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways - tests := []struct { - name string - allocatedResources v1.ResourceRequirements - statusResources *kubecontainer.ContainerResources - statusTerminated bool - expectMatch bool - }{{ - name: "guaranteed pod: match", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), - CPULimit: resource.NewMilliQuantity(100, resource.DecimalSI), - MemoryLimit: resource.NewScaledQuantity(100, 6), - }, - expectMatch: true, - }, { - name: "guaranteed pod: cpu request mismatch", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(50, resource.DecimalSI), - CPULimit: resource.NewMilliQuantity(100, resource.DecimalSI), - MemoryLimit: resource.NewScaledQuantity(100, 6), - }, - expectMatch: false, - }, { - name: "guaranteed pod: cpu limit mismatch", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), - CPULimit: resource.NewMilliQuantity(50, resource.DecimalSI), - MemoryLimit: resource.NewScaledQuantity(100, 6), - }, - expectMatch: false, - }, { - name: "guaranteed pod: memory limit mismatch", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), - CPULimit: resource.NewMilliQuantity(100, resource.DecimalSI), - MemoryLimit: resource.NewScaledQuantity(50, 6), - }, - expectMatch: false, - }, { - name: "guaranteed pod: terminated mismatch", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100M"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), - CPULimit: resource.NewMilliQuantity(100, resource.DecimalSI), - MemoryLimit: resource.NewScaledQuantity(50, 6), - }, - statusTerminated: true, - expectMatch: true, - }, { - name: "burstable: no cpu request", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceMemory: resource.MustParse("100M"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(2, resource.DecimalSI), - }, - expectMatch: true, - }, { - name: "burstable: min cpu request", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceMemory: resource.MustParse("100M"), - v1.ResourceCPU: resource.MustParse("2m"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(2, resource.DecimalSI), - }, - expectMatch: true, - }, { - name: "burstable: below min cpu request", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceMemory: resource.MustParse("100M"), - v1.ResourceCPU: resource.MustParse("1m"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(2, resource.DecimalSI), - }, - expectMatch: true, - }, { - name: "burstable: min cpu limit", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("10m"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("10m"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(10, resource.DecimalSI), - CPULimit: resource.NewMilliQuantity(10, resource.DecimalSI), - }, - expectMatch: true, - }, { - name: "burstable: below min cpu limit", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("5m"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("5m"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(5, resource.DecimalSI), - CPULimit: resource.NewMilliQuantity(10, resource.DecimalSI), - }, - expectMatch: true, - }, { - name: "best effort", - allocatedResources: v1.ResourceRequirements{}, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(2, resource.DecimalSI), - }, - expectMatch: true, - }, { - name: "nil status resources: cpu request mismatch", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - }, - }, - statusResources: &kubecontainer.ContainerResources{}, - expectMatch: false, - }, { - name: "nil status resources: cpu limit mismatch", - allocatedResources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - }, - }, - statusResources: &kubecontainer.ContainerResources{ - CPURequest: resource.NewMilliQuantity(2, resource.DecimalSI), - }, - expectMatch: false, - }, { - name: "nil status resources: memory limit mismatch", - allocatedResources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceMemory: resource.MustParse("100M"), - }, - }, - statusResources: &kubecontainer.ContainerResources{}, - expectMatch: false, - }} - - for _, test := range tests { - for _, isSidecarContainer := range []bool{false, true} { - if isSidecarContainer { - test.name += " " + "for sidecar containers" - } - t.Run(test.name, func(t *testing.T) { - var podStatus *kubecontainer.PodStatus - state := kubecontainer.ContainerStateRunning - if test.statusTerminated { - state = kubecontainer.ContainerStateExited - } - - allocatedPod := v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - } - - if isSidecarContainer { - allocatedPod.Spec = v1.PodSpec{ - InitContainers: []v1.Container{{ - Name: "c1-init", - Resources: test.allocatedResources, - RestartPolicy: &containerRestartPolicyAlways, - }}, - } - podStatus = &kubecontainer.PodStatus{ - Name: "test", - ContainerStatuses: []*kubecontainer.Status{ - { - Name: "c1-init", - State: state, - Resources: test.statusResources, - }, - }, - } - } else { - allocatedPod.Spec = v1.PodSpec{ - Containers: []v1.Container{{ - Name: "c", - Resources: test.allocatedResources, - }}, - } - podStatus = &kubecontainer.PodStatus{ - Name: "test", - ContainerStatuses: []*kubecontainer.Status{ - { - Name: "c", - State: state, - Resources: test.statusResources, - }, - }, - } - } - match := allocatedResourcesMatchStatus(&allocatedPod, podStatus) - assert.Equal(t, test.expectMatch, match) - }) - } - } -} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ab64cb20ddd..1d1ec052027 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2595,7 +2595,6 @@ func TestHandlePodResourcesResize(t *testing.T) { kubelet := testKubelet.kubelet containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways - cpu1m := resource.MustParse("1m") cpu2m := resource.MustParse("2m") cpu500m := resource.MustParse("500m") cpu1000m := resource.MustParse("1") @@ -2804,24 +2803,6 @@ func TestHandlePodResourcesResize(t *testing.T) { expectedResize: v1.PodResizeStatusInProgress, expectBackoffReset: true, }, - { - name: "Equivalent min CPU shares", - originalRequests: v1.ResourceList{v1.ResourceCPU: cpu1m}, - newRequests: v1.ResourceList{v1.ResourceCPU: cpu2m}, - expectedAllocatedReqs: v1.ResourceList{v1.ResourceCPU: cpu2m}, - expectedResize: "", - // Even though the resize isn't being actuated, we still clear the container backoff - // since the allocation is changing. - expectBackoffReset: true, - }, - { - name: "Equivalent min CPU shares - already allocated", - originalRequests: v1.ResourceList{v1.ResourceCPU: cpu2m}, - newRequests: v1.ResourceList{v1.ResourceCPU: cpu1m}, - newResourcesAllocated: true, - expectedAllocatedReqs: v1.ResourceList{v1.ResourceCPU: cpu1m}, - expectedResize: "", - }, { name: "Increase CPU from min limit", originalRequests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10m")}, @@ -2844,35 +2825,11 @@ func TestHandlePodResourcesResize(t *testing.T) { expectedResize: v1.PodResizeStatusInProgress, expectBackoffReset: true, }, - { - name: "Equivalent min CPU limit", - originalRequests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, - originalLimits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, - newRequests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, // Unchanged - newLimits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10m")}, - expectedAllocatedReqs: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, - expectedAllocatedLims: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10m")}, - expectedResize: "", - // Even though the resize isn't being actuated, we still clear the container backoff - // since the allocation is changing. - expectBackoffReset: true, - }, - { - name: "Equivalent min CPU limit - already allocated", - originalRequests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, - originalLimits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10m")}, - newRequests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, // Unchanged - newLimits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, - expectedAllocatedReqs: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, - expectedAllocatedLims: v1.ResourceList{v1.ResourceCPU: resource.MustParse("5m")}, - newResourcesAllocated: true, - expectedResize: "", - }, } for _, tt := range tests { for _, isSidecarContainer := range []bool{false, true} { - t.Run(tt.name, func(t *testing.T) { + t.Run(fmt.Sprintf("%s/sidecar=%t", tt.name, isSidecarContainer), func(t *testing.T) { kubelet.statusManager = status.NewFakeManager() var originalPod *v1.Pod @@ -2905,6 +2862,8 @@ func TestHandlePodResourcesResize(t *testing.T) { } else { require.NoError(t, kubelet.allocationManager.SetPodAllocation(newPod)) } + require.NoError(t, kubelet.allocationManager.SetActuatedResources(originalPod, nil)) + t.Cleanup(func() { kubelet.allocationManager.DeletePod(originalPod.UID) }) podStatus := &kubecontainer.PodStatus{ ID: originalPod.UID, @@ -3458,6 +3417,7 @@ func TestSyncPodSpans(t *testing.T) { kubelet.containerManager, kubelet.containerLogManager, kubelet.runtimeClassManager, + kubelet.allocationManager, false, kubeCfg.MemorySwap.SwapBehavior, kubelet.containerManager.GetNodeAllocatableAbsolute, @@ -3715,3 +3675,258 @@ func TestRecordAdmissionRejection(t *testing.T) { }) } } + +func TestIsPodResizeInProgress(t *testing.T) { + type testResources struct { + cpuReq, cpuLim, memReq, memLim int64 + } + type testContainer struct { + allocated testResources + actuated *testResources + nonSidecarInit, sidecar bool + isRunning bool + unstarted bool // Whether the container is missing from the pod status + } + + tests := []struct { + name string + containers []testContainer + expectHasResize bool + }{{ + name: "simple running container", + containers: []testContainer{{ + allocated: testResources{100, 100, 100, 100}, + actuated: &testResources{100, 100, 100, 100}, + isRunning: true, + }}, + expectHasResize: false, + }, { + name: "simple unstarted container", + containers: []testContainer{{ + allocated: testResources{100, 100, 100, 100}, + unstarted: true, + }}, + expectHasResize: false, + }, { + name: "simple resized container/cpu req", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{150, 200, 100, 200}, + isRunning: true, + }}, + expectHasResize: true, + }, { + name: "simple resized container/cpu limit", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 300, 100, 200}, + isRunning: true, + }}, + expectHasResize: true, + }, { + name: "simple resized container/mem req", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 200, 150, 200}, + isRunning: true, + }}, + // Memory requests aren't actuated and should be ignored. + expectHasResize: false, + }, { + name: "simple resized container/cpu+mem req", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{150, 200, 150, 200}, + isRunning: true, + }}, + expectHasResize: true, + }, { + name: "simple resized container/mem limit", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 200, 100, 300}, + isRunning: true, + }}, + expectHasResize: true, + }, { + name: "terminated resized container", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{200, 200, 100, 200}, + isRunning: false, + }}, + expectHasResize: false, + }, { + name: "non-sidecar init container", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + nonSidecarInit: true, + isRunning: true, + }, { + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 200, 100, 200}, + isRunning: true, + }}, + expectHasResize: false, + }, { + name: "non-resized sidecar", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 200, 100, 200}, + sidecar: true, + isRunning: true, + }, { + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 200, 100, 200}, + isRunning: true, + }}, + expectHasResize: false, + }, { + name: "resized sidecar", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{200, 200, 100, 200}, + sidecar: true, + isRunning: true, + }, { + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 200, 100, 200}, + isRunning: true, + }}, + expectHasResize: true, + }, { + name: "several containers and a resize", + containers: []testContainer{{ + allocated: testResources{100, 200, 100, 200}, + nonSidecarInit: true, + isRunning: true, + }, { + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{100, 200, 100, 200}, + isRunning: true, + }, { + allocated: testResources{100, 200, 100, 200}, + unstarted: true, + }, { + allocated: testResources{100, 200, 100, 200}, + actuated: &testResources{200, 200, 100, 200}, // Resized + isRunning: true, + }}, + expectHasResize: true, + }, { + name: "best-effort pod", + containers: []testContainer{{ + allocated: testResources{}, + actuated: &testResources{}, + isRunning: true, + }}, + expectHasResize: false, + }, { + name: "burstable pod/not resizing", + containers: []testContainer{{ + allocated: testResources{cpuReq: 100}, + actuated: &testResources{cpuReq: 100}, + isRunning: true, + }}, + expectHasResize: false, + }, { + name: "burstable pod/resized", + containers: []testContainer{{ + allocated: testResources{cpuReq: 100}, + actuated: &testResources{cpuReq: 500}, + isRunning: true, + }}, + expectHasResize: true, + }} + + mkRequirements := func(r testResources) v1.ResourceRequirements { + res := v1.ResourceRequirements{ + Requests: v1.ResourceList{}, + Limits: v1.ResourceList{}, + } + if r.cpuReq != 0 { + res.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(r.cpuReq, resource.DecimalSI) + } + if r.cpuLim != 0 { + res.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(r.cpuLim, resource.DecimalSI) + } + if r.memReq != 0 { + res.Requests[v1.ResourceMemory] = *resource.NewQuantity(r.memReq, resource.DecimalSI) + } + if r.memLim != 0 { + res.Limits[v1.ResourceMemory] = *resource.NewQuantity(r.memLim, resource.DecimalSI) + } + return res + } + mkContainer := func(index int, c testContainer) v1.Container { + container := v1.Container{ + Name: fmt.Sprintf("c%d", index), + Resources: mkRequirements(c.allocated), + } + if c.sidecar { + container.RestartPolicy = ptr.To(v1.ContainerRestartPolicyAlways) + } + return container + } + + testKubelet := newTestKubelet(t, false) + defer testKubelet.Cleanup() + kl := testKubelet.kubelet + am := kl.allocationManager + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + UID: "12345", + }, + } + t.Cleanup(func() { am.DeletePod(pod.UID) }) + podStatus := &kubecontainer.PodStatus{ + ID: pod.UID, + Name: pod.Name, + } + for i, c := range test.containers { + // Add the container to the pod + container := mkContainer(i, c) + if c.nonSidecarInit || c.sidecar { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, container) + } else { + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + + // Add the container to the pod status, if it's started. + if !test.containers[i].unstarted { + cs := kubecontainer.Status{ + Name: container.Name, + } + if test.containers[i].isRunning { + cs.State = kubecontainer.ContainerStateRunning + } else { + cs.State = kubecontainer.ContainerStateExited + } + podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &cs) + } + + // Register the actuated container (if needed) + if c.actuated != nil { + actuatedContainer := container.DeepCopy() + actuatedContainer.Resources = mkRequirements(*c.actuated) + require.NoError(t, am.SetActuatedResources(pod, actuatedContainer)) + + fetched, found := am.GetActuatedResources(pod.UID, container.Name) + require.True(t, found) + assert.Equal(t, actuatedContainer.Resources, fetched) + } else { + _, found := am.GetActuatedResources(pod.UID, container.Name) + require.False(t, found) + } + } + require.NoError(t, am.SetPodAllocation(pod)) + + hasResizedResources := kl.isPodResizeInProgress(pod, podStatus) + require.Equal(t, test.expectHasResize, hasResizedResources, "hasResizedResources") + }) + } +} diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index 5c125f17572..ade8c114251 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -32,6 +32,7 @@ import ( "k8s.io/component-base/logs/logreduction" internalapi "k8s.io/cri-api/pkg/apis" "k8s.io/kubernetes/pkg/credentialprovider" + "k8s.io/kubernetes/pkg/kubelet/allocation" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/images" @@ -119,6 +120,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS logManager: logManager, memoryThrottlingFactor: 0.9, podLogsDirectory: fakePodLogsDirectory, + allocationManager: allocation.NewInMemoryManager(), } typedVersion, err := runtimeService.Version(ctx, kubeRuntimeAPIVersion) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index a8c799e829d..7ecf5f74ca7 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -256,6 +256,12 @@ func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandb return s.Message(), ErrCreateContainerConfig } + // When creating a container, mark the resources as actuated. + if err := m.allocationManager.SetActuatedResources(pod, container); err != nil { + m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", err) + return err.Error(), ErrCreateContainerConfig + } + err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig) if err != nil { s, _ := grpcstatus.FromError(err) @@ -399,8 +405,8 @@ func (m *kubeGenericRuntimeManager) updateContainerResources(pod *v1.Pod, contai } ctx := context.Background() err := m.runtimeService.UpdateContainerResources(ctx, containerID.ID, containerResources) - if err != nil { - klog.ErrorS(err, "UpdateContainerResources failed", "container", containerID.String()) + if err == nil { + err = m.allocationManager.SetActuatedResources(pod, container) } return err } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index dd90ef2babe..167fd4a8f80 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/credentialprovider/plugin" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/allocation" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" @@ -58,7 +59,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/metrics" - "k8s.io/kubernetes/pkg/kubelet/pleg" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/runtimeclass" "k8s.io/kubernetes/pkg/kubelet/sysctl" @@ -156,6 +156,9 @@ type kubeGenericRuntimeManager struct { // Manage RuntimeClass resources. runtimeClassManager *runtimeclass.Manager + // Manager allocated & actuated resources. + allocationManager allocation.Manager + // Cache last per-container error message to reduce log spam logReduction *logreduction.LogReduction @@ -213,6 +216,7 @@ func NewKubeGenericRuntimeManager( containerManager cm.ContainerManager, logManager logs.ContainerLogManager, runtimeClassManager *runtimeclass.Manager, + allocationManager allocation.Manager, seccompDefault bool, memorySwapBehavior string, getNodeAllocatable func() v1.ResourceList, @@ -242,6 +246,7 @@ func NewKubeGenericRuntimeManager( internalLifecycle: containerManager.InternalContainerLifecycle(), logManager: logManager, runtimeClassManager: runtimeClassManager, + allocationManager: allocationManager, logReduction: logreduction.NewLogReduction(identicalErrorDelay), seccompDefault: seccompDefault, memorySwapBehavior: memorySwapBehavior, @@ -550,6 +555,15 @@ func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) boo return cStatus.State == kubecontainer.ContainerStateExited && cStatus.ExitCode == 0 } +func containerResourcesFromRequirements(requirements *v1.ResourceRequirements) containerResources { + return containerResources{ + memoryLimit: requirements.Limits.Memory().Value(), + memoryRequest: requirements.Requests.Memory().Value(), + cpuLimit: requirements.Limits.Cpu().MilliValue(), + cpuRequest: requirements.Requests.Cpu().MilliValue(), + } +} + // computePodResizeAction determines the actions required (if any) to resize the given container. // Returns whether to keep (true) or restart (false) the container. // TODO(vibansal): Make this function to be agnostic to whether it is dealing with a restartable init container or not (i.e. remove the argument `isRestartableInitContainer`). @@ -573,47 +587,17 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe return true } - if kubeContainerStatus.Resources == nil { - // Not enough information to actuate a resize. - klog.V(4).InfoS("Missing runtime resource information for container", "pod", klog.KObj(pod), "container", container.Name) - return true + actuatedResources, found := m.allocationManager.GetActuatedResources(pod.UID, container.Name) + if !found { + klog.ErrorS(nil, "Missing actuated resource record", "pod", klog.KObj(pod), "container", container.Name) + // Proceed with the zero-value actuated resources. For restart NotRequired, this may + // result in an extra call to UpdateContainerResources, but that call should be idempotent. + // For RestartContainer, this may trigger a container restart. } - desiredResources := containerResources{ - memoryLimit: container.Resources.Limits.Memory().Value(), - memoryRequest: container.Resources.Requests.Memory().Value(), - cpuLimit: container.Resources.Limits.Cpu().MilliValue(), - cpuRequest: container.Resources.Requests.Cpu().MilliValue(), - } + desiredResources := containerResourcesFromRequirements(&container.Resources) + currentResources := containerResourcesFromRequirements(&actuatedResources) - currentResources := containerResources{ - // memoryRequest isn't set by the runtime, so default it to the desired. - memoryRequest: desiredResources.memoryRequest, - } - if kubeContainerStatus.Resources.MemoryLimit != nil { - currentResources.memoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value() - } - if kubeContainerStatus.Resources.CPULimit != nil { - currentResources.cpuLimit = kubeContainerStatus.Resources.CPULimit.MilliValue() - } - if kubeContainerStatus.Resources.CPURequest != nil { - currentResources.cpuRequest = kubeContainerStatus.Resources.CPURequest.MilliValue() - } - // Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during - // handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest. - - // Special case for minimum CPU request - if desiredResources.cpuRequest <= cm.MinShares && currentResources.cpuRequest <= cm.MinShares { - // If both desired & current CPU requests are at or below MinShares, - // then consider these equal. - desiredResources.cpuRequest = currentResources.cpuRequest - } - // Special case for minimum CPU limit - if desiredResources.cpuLimit <= cm.MinMilliCPULimit && currentResources.cpuLimit <= cm.MinMilliCPULimit { - // If both desired & current CPU limit are at or below the minimum effective limit, - // then consider these equal. - desiredResources.cpuLimit = currentResources.cpuLimit - } if currentResources == desiredResources { // No resize required. return true @@ -755,6 +739,11 @@ func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podContainerC } return err } + + // Always update the pod status once. Even if there was a resize error, the resize may have been + // partially actuated. + defer m.runtimeHelper.SetPodWatchCondition(pod.UID, "doPodResizeAction", func(*kubecontainer.PodStatus) bool { return true }) + if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources { currentPodMemoryConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceMemory) if err != nil { @@ -846,35 +835,6 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res "pod", format.Pod(pod), "resourceName", resourceName) return err } - resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName) - - // Watch (poll) the container for the expected resources update. Stop watching once the resources - // match the desired values. - resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool { - if status.Resources == nil { - return false - } - switch resourceName { - case v1.ResourceMemory: - actualLimit := nonNilQuantity(status.Resources.MemoryLimit) - return actualLimit.Equal(*container.Resources.Limits.Memory()) - case v1.ResourceCPU: - actualLimit := nonNilQuantity(status.Resources.CPULimit) - actualRequest := nonNilQuantity(status.Resources.CPURequest) - desiredLimit := container.Resources.Limits.Cpu() - desiredRequest := container.Resources.Requests.Cpu() - // Consider limits equal if both are at or below the effective minimum limit. - equalLimits := actualLimit.Equal(*desiredLimit) || (actualLimit.MilliValue() <= cm.MinMilliCPULimit && - desiredLimit.MilliValue() <= cm.MinMilliCPULimit) - // Consider requests equal if both are at or below MinShares. - equalRequests := actualRequest.Equal(*desiredRequest) || (actualRequest.MilliValue() <= cm.MinShares && - desiredRequest.MilliValue() <= cm.MinShares) - return equalLimits && equalRequests - default: - return true // Shouldn't happen. - } - }) - m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition) // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. @@ -891,15 +851,6 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res return nil } -// nonNilQuantity returns a non-nil quantity. If the input is non-nil, it is returned. Otherwise a -// pointer to the zero value is returned. -func nonNilQuantity(q *resource.Quantity) *resource.Quantity { - if q != nil { - return q - } - return &resource.Quantity{} -} - // computePodActions checks whether the pod spec has changed and returns the changes if true. func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod)) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 2df2fee6fa6..74b28daf061 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -2558,9 +2558,6 @@ func TestComputePodActionsForPodResize(t *testing.T) { m.machineInfo.MemoryCapacity = 17179860387 // 16GB assert.NoError(t, err) - cpu1m := resource.MustParse("1m") - cpu2m := resource.MustParse("2m") - cpu10m := resource.MustParse("10m") cpu100m := resource.MustParse("100m") cpu200m := resource.MustParse("200m") mem100M := resource.MustParse("100Mi") @@ -2570,22 +2567,28 @@ func TestComputePodActionsForPodResize(t *testing.T) { cpuPolicyRestartRequired := v1.ContainerResizePolicy{ResourceName: v1.ResourceCPU, RestartPolicy: v1.RestartContainer} memPolicyRestartRequired := v1.ContainerResizePolicy{ResourceName: v1.ResourceMemory, RestartPolicy: v1.RestartContainer} + setupActuatedResources := func(pod *v1.Pod, container *v1.Container, actuatedResources v1.ResourceRequirements) { + actuatedContainer := container.DeepCopy() + actuatedContainer.Resources = actuatedResources + require.NoError(t, m.allocationManager.SetActuatedResources(pod, actuatedContainer)) + } + for desc, test := range map[string]struct { - setupFn func(*v1.Pod, *kubecontainer.PodStatus) + setupFn func(*v1.Pod) getExpectedPodActionsFn func(*v1.Pod, *kubecontainer.PodStatus) *podActions }{ "Update container CPU and memory resources": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[1] c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu200m.DeepCopy()), - MemoryLimit: ptr.To(mem200M.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu200m.DeepCopy(), + v1.ResourceMemory: mem200M.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { kcs := podStatus.FindContainerStatusByName(pod.Spec.Containers[1].Name) @@ -2628,17 +2631,17 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU resources": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[1] c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu200m.DeepCopy()), - MemoryLimit: ptr.To(mem100M.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu200m.DeepCopy(), + v1.ResourceMemory: mem100M.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { kcs := podStatus.FindContainerStatusByName(pod.Spec.Containers[1].Name) @@ -2667,17 +2670,17 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container memory resources": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[2] c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu200m.DeepCopy()), - MemoryLimit: ptr.To(mem100M.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu200m.DeepCopy(), + v1.ResourceMemory: mem100M.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { kcs := podStatus.FindContainerStatusByName(pod.Spec.Containers[2].Name) @@ -2706,16 +2709,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Nothing when spec.Resources and status.Resources are equal": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[1] c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu200m.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu200m.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { pa := podActions{ @@ -2727,39 +2730,11 @@ func TestComputePodActionsForPodResize(t *testing.T) { return &pa }, }, - "Nothing when spec.Resources and status.Resources are equivalent": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + "Nothing when spec.Resources and status.Resources are equal (besteffort)": { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[1] - c.Resources = v1.ResourceRequirements{} // best effort pod - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPURequest: ptr.To(cpu2m.DeepCopy()), - } - } - }, - getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { - pa := podActions{ - SandboxID: podStatus.SandboxStatuses[0].Id, - ContainersToKill: getKillMap(pod, podStatus, []int{}), - ContainersToStart: []int{}, - ContainersToUpdate: map[v1.ResourceName][]containerToUpdateInfo{}, - } - return &pa - }, - }, - "Update container CPU resources to equivalent value": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { - c := &pod.Spec.Containers[1] - c.Resources = v1.ResourceRequirements{ - Requests: v1.ResourceList{v1.ResourceCPU: cpu1m}, - Limits: v1.ResourceList{v1.ResourceCPU: cpu1m}, - } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPURequest: ptr.To(cpu2m.DeepCopy()), - CPULimit: ptr.To(cpu10m.DeepCopy()), - } - } + c.Resources = v1.ResourceRequirements{} + setupActuatedResources(pod, c, v1.ResourceRequirements{}) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { pa := podActions{ @@ -2772,18 +2747,18 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU and memory resources with Restart policy for CPU": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[0] c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu100m.DeepCopy()), - MemoryLimit: ptr.To(mem100M.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu100m.DeepCopy(), + v1.ResourceMemory: mem100M.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { kcs := podStatus.FindContainerStatusByName(pod.Spec.Containers[0].Name) @@ -2803,18 +2778,18 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU and memory resources with Restart policy for memory": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[2] c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu100m.DeepCopy()), - MemoryLimit: ptr.To(mem100M.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu100m.DeepCopy(), + v1.ResourceMemory: mem100M.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { kcs := podStatus.FindContainerStatusByName(pod.Spec.Containers[2].Name) @@ -2834,18 +2809,18 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container memory resources with Restart policy for CPU": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[1] c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem200M}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu100m.DeepCopy()), - MemoryLimit: ptr.To(mem100M.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu100m.DeepCopy(), + v1.ResourceMemory: mem100M.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { kcs := podStatus.FindContainerStatusByName(pod.Spec.Containers[1].Name) @@ -2874,18 +2849,18 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU resources with Restart policy for memory": { - setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + setupFn: func(pod *v1.Pod) { c := &pod.Spec.Containers[2] c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, } - if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { - cStatus.Resources = &kubecontainer.ContainerResources{ - CPULimit: ptr.To(cpu100m.DeepCopy()), - MemoryLimit: ptr.To(mem100M.DeepCopy()), - } - } + setupActuatedResources(pod, c, v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpu100m.DeepCopy(), + v1.ResourceMemory: mem100M.DeepCopy(), + }, + }) }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { kcs := podStatus.FindContainerStatusByName(pod.Spec.Containers[2].Name) @@ -2921,8 +2896,9 @@ func TestComputePodActionsForPodResize(t *testing.T) { pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired} } if test.setupFn != nil { - test.setupFn(pod, status) + test.setupFn(pod) } + t.Cleanup(func() { m.allocationManager.DeletePod(pod.UID) }) for idx := range pod.Spec.Containers { // compute hash From 6d0b6278cd3eafeb1c1f1f440c22ec59e9d800bc Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Mon, 10 Mar 2025 10:01:23 -0700 Subject: [PATCH 3/3] Rename some allocation.Manager methods --- pkg/kubelet/allocation/allocation_manager.go | 14 +++++++------- pkg/kubelet/kubelet.go | 6 +++--- pkg/kubelet/kubelet_pods_test.go | 2 +- pkg/kubelet/kubelet_test.go | 12 ++++++------ .../kuberuntime/kuberuntime_manager_test.go | 2 +- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index e4b17627ca9..576612c52a1 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -46,8 +46,8 @@ type Manager interface { // Returns the updated (or original) pod, and whether there was an allocation stored. UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) - // SetPodAllocation checkpoints the resources allocated to a pod's containers. - SetPodAllocation(allocatedPod *v1.Pod) error + // SetAllocatedResources checkpoints the resources allocated to a pod's containers. + SetAllocatedResources(allocatedPod *v1.Pod) error // SetActuatedResources records the actuated resources of the given container (or the entire // pod, if actuatedContainer is nil). @@ -56,8 +56,8 @@ type Manager interface { // GetActuatedResources returns the stored actuated resources for the container, and whether they exist. GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) - // DeletePod removes any stored state for the given pod UID. - DeletePod(uid types.UID) + // RemovePod removes any stored state for the given pod UID. + RemovePod(uid types.UID) // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. RemoveOrphanedPods(remainingPods sets.Set[types.UID]) @@ -151,8 +151,8 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (* return pod, updated } -// SetPodAllocation checkpoints the resources allocated to a pod's containers -func (m *manager) SetPodAllocation(pod *v1.Pod) error { +// SetAllocatedResources checkpoints the resources allocated to a pod's containers +func (m *manager) SetAllocatedResources(pod *v1.Pod) error { return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod)) } @@ -175,7 +175,7 @@ func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements { return podAlloc } -func (m *manager) DeletePod(uid types.UID) { +func (m *manager) RemovePod(uid types.UID) { if err := m.allocated.Delete(uid, ""); err != nil { // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3004a8d03e6..a26e23b801e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2667,7 +2667,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { continue } // For new pod, checkpoint the resource values at which the Pod has been admitted - if err := kl.allocationManager.SetPodAllocation(allocatedPod); err != nil { + if err := kl.allocationManager.SetAllocatedResources(allocatedPod); err != nil { //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod)) } @@ -2723,7 +2723,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.RemovePod(pod) - kl.allocationManager.DeletePod(pod.UID) + kl.allocationManager.RemovePod(pod.UID) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { @@ -2909,7 +2909,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine fit, resizeStatus, resizeMsg := kl.canResizePod(pod) if fit { // Update pod resource allocation checkpoint - if err := kl.allocationManager.SetPodAllocation(pod); err != nil { + if err := kl.allocationManager.SetAllocatedResources(pod); err != nil { return nil, err } for i, container := range pod.Spec.Containers { diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index e4d3d20093f..c48894cf82a 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -5093,7 +5093,7 @@ func TestConvertToAPIContainerStatusesForResources(t *testing.T) { } else { tPod.Spec.Containers[0].Resources = tc.Resources } - err := kubelet.allocationManager.SetPodAllocation(tPod) + err := kubelet.allocationManager.SetAllocatedResources(tPod) require.NoError(t, err) resources := tc.ActualResources if resources == nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 1d1ec052027..da00c6b3e9d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2568,7 +2568,7 @@ func TestPodResourceAllocationReset(t *testing.T) { t.Run(tc.name, func(t *testing.T) { if tc.existingPodAllocation != nil { // when kubelet restarts, AllocatedResources has already existed before adding pod - err := kubelet.allocationManager.SetPodAllocation(tc.existingPodAllocation) + err := kubelet.allocationManager.SetAllocatedResources(tc.existingPodAllocation) if err != nil { t.Fatalf("failed to set pod allocation: %v", err) } @@ -2858,12 +2858,12 @@ func TestHandlePodResourcesResize(t *testing.T) { } if !tt.newResourcesAllocated { - require.NoError(t, kubelet.allocationManager.SetPodAllocation(originalPod)) + require.NoError(t, kubelet.allocationManager.SetAllocatedResources(originalPod)) } else { - require.NoError(t, kubelet.allocationManager.SetPodAllocation(newPod)) + require.NoError(t, kubelet.allocationManager.SetAllocatedResources(newPod)) } require.NoError(t, kubelet.allocationManager.SetActuatedResources(originalPod, nil)) - t.Cleanup(func() { kubelet.allocationManager.DeletePod(originalPod.UID) }) + t.Cleanup(func() { kubelet.allocationManager.RemovePod(originalPod.UID) }) podStatus := &kubecontainer.PodStatus{ ID: originalPod.UID, @@ -3882,7 +3882,7 @@ func TestIsPodResizeInProgress(t *testing.T) { UID: "12345", }, } - t.Cleanup(func() { am.DeletePod(pod.UID) }) + t.Cleanup(func() { am.RemovePod(pod.UID) }) podStatus := &kubecontainer.PodStatus{ ID: pod.UID, Name: pod.Name, @@ -3923,7 +3923,7 @@ func TestIsPodResizeInProgress(t *testing.T) { require.False(t, found) } } - require.NoError(t, am.SetPodAllocation(pod)) + require.NoError(t, am.SetAllocatedResources(pod)) hasResizedResources := kl.isPodResizeInProgress(pod, podStatus) require.Equal(t, test.expectHasResize, hasResizedResources, "hasResizedResources") diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 74b28daf061..c36a0e78f68 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -2898,7 +2898,7 @@ func TestComputePodActionsForPodResize(t *testing.T) { if test.setupFn != nil { test.setupFn(pod) } - t.Cleanup(func() { m.allocationManager.DeletePod(pod.UID) }) + t.Cleanup(func() { m.allocationManager.RemovePod(pod.UID) }) for idx := range pod.Spec.Containers { // compute hash