diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go new file mode 100644 index 00000000000..444f9d84149 --- /dev/null +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -0,0 +1,164 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocation + +import ( + v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/allocation/state" +) + +// podStatusManagerStateFile is the file name where status manager stores its state +const podStatusManagerStateFile = "pod_status_manager_state" + +// AllocationManager tracks pod resource allocations. +type Manager interface { + // GetContainerResourceAllocation returns the AllocatedResources value for the container + GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) + + // UpdatePodFromAllocation overwrites the pod spec with the allocation. + // This function does a deep copy only if updates are needed. + // Returns the updated (or original) pod, and whether there was an allocation stored. + UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) + + // SetPodAllocation checkpoints the resources allocated to a pod's containers. + SetPodAllocation(pod *v1.Pod) error + + // DeletePodAllocation removes any stored state for the given pod UID. + DeletePodAllocation(uid types.UID) + + // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. + RemoveOrphanedPods(remainingPods sets.Set[types.UID]) +} + +type manager struct { + state state.State +} + +func NewManager(checkpointDirectory string) Manager { + m := &manager{} + + if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile) + if err != nil { + // This is a crictical, non-recoverable failure. + klog.ErrorS(err, "Failed to initialize allocation checkpoint manager") + panic(err) + } + m.state = stateImpl + } else { + m.state = state.NewNoopStateCheckpoint() + } + + return m +} + +// NewInMemoryManager returns an allocation manager that doesn't persist state. +// For testing purposes only! +func NewInMemoryManager() Manager { + return &manager{ + state: state.NewStateMemory(nil), + } +} + +// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values +// If checkpoint manager has not been initialized, it returns nil, false +func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { + return m.state.GetContainerResourceAllocation(podUID, containerName) +} + +// UpdatePodFromAllocation overwrites the pod spec with the allocation. +// This function does a deep copy only if updates are needed. +func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { + // TODO(tallclair): This clones the whole cache, but we only need 1 pod. + allocs := m.state.GetPodResourceAllocation() + return updatePodFromAllocation(pod, allocs) +} + +func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) { + allocated, found := allocs[string(pod.UID)] + if !found { + return pod, false + } + + updated := false + containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) { + if cAlloc, ok := allocated[c.Name]; ok { + if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) { + // Allocation differs from pod spec, retrieve the allocation + if !updated { + // If this is the first update to be performed, copy the pod + pod = pod.DeepCopy() + updated = true + } + return cAlloc, true + } + } + return v1.ResourceRequirements{}, false + } + + for i, c := range pod.Spec.Containers { + if cAlloc, found := containerAlloc(c); found { + // Allocation differs from pod spec, update + pod.Spec.Containers[i].Resources = cAlloc + } + } + for i, c := range pod.Spec.InitContainers { + if cAlloc, found := containerAlloc(c); found { + // Allocation differs from pod spec, update + pod.Spec.InitContainers[i].Resources = cAlloc + } + } + return pod, updated +} + +// SetPodAllocation checkpoints the resources allocated to a pod's containers +func (m *manager) SetPodAllocation(pod *v1.Pod) error { + podAlloc := make(map[string]v1.ResourceRequirements) + for _, container := range pod.Spec.Containers { + alloc := *container.Resources.DeepCopy() + podAlloc[container.Name] = alloc + } + + if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) { + for _, container := range pod.Spec.InitContainers { + if podutil.IsRestartableInitContainer(&container) { + alloc := *container.Resources.DeepCopy() + podAlloc[container.Name] = alloc + } + } + } + + return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc) +} + +func (m *manager) DeletePodAllocation(uid types.UID) { + if err := m.state.Delete(string(uid), ""); err != nil { + // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. + klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) + } +} + +func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { + m.state.RemoveOrphanedPods(remainingPods) +} diff --git a/pkg/kubelet/allocation/allocation_manager_test.go b/pkg/kubelet/allocation/allocation_manager_test.go new file mode 100644 index 00000000000..64863c8efb5 --- /dev/null +++ b/pkg/kubelet/allocation/allocation_manager_test.go @@ -0,0 +1,165 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocation + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/kubelet/allocation/state" +) + +func TestUpdatePodFromAllocation(t *testing.T) { + containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "12345", + Name: "test", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI), + }, + }, + }, + { + Name: "c2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), + }, + }, + }, + }, + InitContainers: []v1.Container{ + { + Name: "c1-restartable-init", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), + }, + }, + RestartPolicy: &containerRestartPolicyAlways, + }, + { + Name: "c1-init", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), + }, + }, + }, + }, + }, + } + + resizedPod := pod.DeepCopy() + resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI) + resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI) + + tests := []struct { + name string + pod *v1.Pod + allocs state.PodResourceAllocation + expectPod *v1.Pod + expectUpdate bool + }{{ + name: "steady state", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c1": *pod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + "c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(), + "c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: false, + }, { + name: "no allocations", + pod: pod, + allocs: state.PodResourceAllocation{}, + expectUpdate: false, + }, { + name: "missing container allocation", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: false, + }, { + name: "resized container", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(), + "c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(), + "c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: true, + expectPod: resizedPod, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pod := test.pod.DeepCopy() + allocatedPod, updated := updatePodFromAllocation(pod, test.allocs) + + if test.expectUpdate { + assert.True(t, updated, "updated") + assert.Equal(t, test.expectPod, allocatedPod) + assert.NotEqual(t, pod, allocatedPod) + } else { + assert.False(t, updated, "updated") + assert.Same(t, pod, allocatedPod) + } + }) + } +} diff --git a/pkg/kubelet/allocation/doc.go b/pkg/kubelet/allocation/doc.go new file mode 100644 index 00000000000..349d31b41ec --- /dev/null +++ b/pkg/kubelet/allocation/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package allocation handles tracking pod resource allocations. +package allocation diff --git a/pkg/kubelet/status/state/checkpoint.go b/pkg/kubelet/allocation/state/checkpoint.go similarity index 100% rename from pkg/kubelet/status/state/checkpoint.go rename to pkg/kubelet/allocation/state/checkpoint.go diff --git a/pkg/kubelet/status/state/state.go b/pkg/kubelet/allocation/state/state.go similarity index 89% rename from pkg/kubelet/status/state/state.go rename to pkg/kubelet/allocation/state/state.go index 5f63b0466fc..4688b087839 100644 --- a/pkg/kubelet/status/state/state.go +++ b/pkg/kubelet/allocation/state/state.go @@ -18,6 +18,8 @@ package state import ( v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" ) // PodResourceAllocation type is used in tracking resources allocated to pod's containers @@ -48,6 +50,8 @@ type writer interface { SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error Delete(podUID string, containerName string) error + // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. + RemoveOrphanedPods(remainingPods sets.Set[types.UID]) } // State interface provides methods for tracking and setting pod resource allocation diff --git a/pkg/kubelet/status/state/state_checkpoint.go b/pkg/kubelet/allocation/state/state_checkpoint.go similarity index 92% rename from pkg/kubelet/status/state/state_checkpoint.go rename to pkg/kubelet/allocation/state/state_checkpoint.go index 78ca388958d..b326f2ff084 100644 --- a/pkg/kubelet/status/state/state_checkpoint.go +++ b/pkg/kubelet/allocation/state/state_checkpoint.go @@ -22,6 +22,8 @@ import ( "sync" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" @@ -139,6 +141,12 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) error { return sc.storeState() } +func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { + sc.cache.RemoveOrphanedPods(remainingPods) + // Don't bother updating the stored state. If Kubelet is restarted before the cache is written, + // the orphaned pods will be removed the next time this method is called. +} + type noopStateCheckpoint struct{} // NewNoopStateCheckpoint creates a dummy state checkpoint manager @@ -165,3 +173,5 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v func (sc *noopStateCheckpoint) Delete(_ string, _ string) error { return nil } + +func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ sets.Set[types.UID]) {} diff --git a/pkg/kubelet/status/state/state_checkpoint_test.go b/pkg/kubelet/allocation/state/state_checkpoint_test.go similarity index 100% rename from pkg/kubelet/status/state/state_checkpoint_test.go rename to pkg/kubelet/allocation/state/state_checkpoint_test.go diff --git a/pkg/kubelet/status/state/state_mem.go b/pkg/kubelet/allocation/state/state_mem.go similarity index 90% rename from pkg/kubelet/status/state/state_mem.go rename to pkg/kubelet/allocation/state/state_mem.go index fd65d119d12..116b6be65dc 100644 --- a/pkg/kubelet/status/state/state_mem.go +++ b/pkg/kubelet/allocation/state/state_mem.go @@ -20,6 +20,8 @@ import ( "sync" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" ) @@ -96,3 +98,14 @@ func (s *stateMemory) Delete(podUID string, containerName string) error { s.deleteContainer(podUID, containerName) return nil } + +func (s *stateMemory) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { + s.Lock() + defer s.Unlock() + + for podUID := range s.podAllocation { + if _, ok := remainingPods[types.UID(podUID)]; !ok { + delete(s.podAllocation, podUID) + } + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3a7b221edcf..59d4158ee8f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -77,6 +77,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/allocation" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" @@ -662,7 +663,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) klet.podManager = kubepod.NewBasicPodManager() - klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir()) + klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker) + klet.allocationManager = allocation.NewManager(klet.getRootDir()) klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) @@ -1147,6 +1149,9 @@ type Kubelet struct { // consult the pod worker. statusManager status.Manager + // allocationManager manages allocated resources for pods. + allocationManager allocation.Manager + // resyncInterval is the interval between periodic full reconciliations of // pods on this node. resyncInterval time.Duration @@ -2638,7 +2643,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // To handle kubelet restarts, test pod admissibility using AllocatedResources values // (for cpu & memory) from checkpoint store. If found, that is the source of truth. - allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod) + allocatedPod, _ := kl.allocationManager.UpdatePodFromAllocation(pod) // Check if we can admit the pod; if not, reject it. if ok, reason, message := kl.canAdmitPod(allocatedPods, allocatedPod); !ok { @@ -2651,7 +2656,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { continue } // For new pod, checkpoint the resource values at which the Pod has been admitted - if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil { + if err := kl.allocationManager.SetPodAllocation(allocatedPod); err != nil { //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod)) } @@ -2707,6 +2712,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.RemovePod(pod) + kl.allocationManager.DeletePodAllocation(pod.UID) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { @@ -2870,7 +2876,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus, string) // calculations after this function is called. It also updates the cached ResizeStatus according to // the allocation decision and pod status. func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) { - allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod) + allocatedPod, updated := kl.allocationManager.UpdatePodFromAllocation(pod) if !updated { // Desired resources == allocated resources. Check whether a resize is in progress. resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus) @@ -2891,7 +2897,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine fit, resizeStatus, resizeMsg := kl.canResizePod(pod) if fit { // Update pod resource allocation checkpoint - if err := kl.statusManager.SetPodAllocation(pod); err != nil { + if err := kl.allocationManager.SetPodAllocation(pod); err != nil { return nil, err } for i, container := range pod.Spec.Containers { diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 9903dd988e9..28378727e87 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -218,7 +218,7 @@ func (kl *Kubelet) getAllocatedPods() []*v1.Pod { allocatedPods := make([]*v1.Pod, len(activePods)) for i, pod := range activePods { - allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod) + allocatedPods[i], _ = kl.allocationManager.UpdatePodFromAllocation(pod) } return allocatedPods } @@ -1170,9 +1170,9 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // desired pods. Pods that must be restarted due to UID reuse, or leftover // pods from previous runs, are not known to the pod worker. - allPodsByUID := make(map[types.UID]*v1.Pod) + allPodsByUID := make(sets.Set[types.UID]) for _, pod := range allPods { - allPodsByUID[pod.UID] = pod + allPodsByUID.Insert(pod.UID) } // Identify the set of pods that have workers, which should be all pods @@ -1219,6 +1219,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // Remove orphaned pod statuses not in the total list of known config pods klog.V(3).InfoS("Clean up orphaned pod statuses") kl.removeOrphanedPodStatuses(allPods, mirrorPods) + kl.allocationManager.RemoveOrphanedPods(allPodsByUID) // Remove orphaned pod user namespace allocations (if any). klog.V(3).InfoS("Clean up orphaned pod user namespace allocations") @@ -2152,7 +2153,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon // Always set the status to the latest allocated resources, even if it differs from the // allocation used by the current sync loop. - alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) + alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName) if !found { // This case is expected for non-resizable containers (ephemeral & non-restartable init containers). // Don't set status.Resources in this case. @@ -2372,7 +2373,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon status.Resources = convertContainerStatusResources(cName, status, cStatus, oldStatuses) if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingAllocatedStatus) { - if alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName); found { + if alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName); found { status.AllocatedResources = alloc.Requests } } diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 13de133f6b3..59940865cc4 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -5093,7 +5093,8 @@ func TestConvertToAPIContainerStatusesForResources(t *testing.T) { } else { tPod.Spec.Containers[0].Resources = tc.Resources } - kubelet.statusManager.SetPodAllocation(tPod) + err := kubelet.allocationManager.SetPodAllocation(tPod) + require.NoError(t, err) resources := tc.ActualResources if resources == nil { resources = &kubecontainer.ContainerResources{ diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5d8b9d397d8..f46defb0d03 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -62,6 +62,8 @@ import ( fakeremote "k8s.io/cri-client/pkg/fake" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/allocation" + "k8s.io/kubernetes/pkg/kubelet/allocation/state" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/clustertrustbundle" @@ -90,7 +92,6 @@ import ( serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/stats" "k8s.io/kubernetes/pkg/kubelet/status" - "k8s.io/kubernetes/pkg/kubelet/status/state" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" "k8s.io/kubernetes/pkg/kubelet/sysctl" "k8s.io/kubernetes/pkg/kubelet/token" @@ -272,7 +273,8 @@ func newTestKubeletWithImageList( kubelet.mirrorPodClient = fakeMirrorClient kubelet.podManager = kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() - kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir()) + kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker) + kubelet.allocationManager = allocation.NewInMemoryManager() kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker() kubelet.containerRuntime = fakeRuntime @@ -2566,14 +2568,14 @@ func TestPodResourceAllocationReset(t *testing.T) { t.Run(tc.name, func(t *testing.T) { if tc.existingPodAllocation != nil { // when kubelet restarts, AllocatedResources has already existed before adding pod - err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation) + err := kubelet.allocationManager.SetPodAllocation(tc.existingPodAllocation) if err != nil { t.Fatalf("failed to set pod allocation: %v", err) } } kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) - allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) + allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) if !found { t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) } @@ -2903,9 +2905,9 @@ func TestHandlePodResourcesResize(t *testing.T) { } if !tt.newResourcesAllocated { - require.NoError(t, kubelet.statusManager.SetPodAllocation(originalPod)) + require.NoError(t, kubelet.allocationManager.SetPodAllocation(originalPod)) } else { - require.NoError(t, kubelet.statusManager.SetPodAllocation(newPod)) + require.NoError(t, kubelet.allocationManager.SetPodAllocation(newPod)) } podStatus := &kubecontainer.PodStatus{ @@ -2951,7 +2953,7 @@ func TestHandlePodResourcesResize(t *testing.T) { assert.Equal(t, tt.expectedAllocatedReqs, updatedPodCtr.Resources.Requests, "updated pod spec requests") assert.Equal(t, tt.expectedAllocatedLims, updatedPodCtr.Resources.Limits, "updated pod spec limits") - alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name) + alloc, found := kubelet.allocationManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name) require.True(t, found, "container allocation") assert.Equal(t, tt.expectedAllocatedReqs, alloc.Requests, "stored container request allocation") assert.Equal(t, tt.expectedAllocatedLims, alloc.Limits, "stored container limit allocation") diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index be435cd1518..5c9f9e0112f 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -17,7 +17,6 @@ limitations under the License. package prober import ( - "os" "reflect" "sync" @@ -114,14 +113,8 @@ func newTestManager() *manager { podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) - testRootDir := "" - if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil { - return nil - } else { - testRootDir = tempDir - } m := NewManager( - status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), + status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker), results.NewManager(), results.NewManager(), results.NewManager(), diff --git a/pkg/kubelet/prober/scale_test.go b/pkg/kubelet/prober/scale_test.go index ef21df14964..35c1da6f427 100644 --- a/pkg/kubelet/prober/scale_test.go +++ b/pkg/kubelet/prober/scale_test.go @@ -21,7 +21,6 @@ import ( "fmt" "net" "net/http" - "os" "sync" "sync/atomic" "testing" @@ -81,16 +80,10 @@ func TestTCPPortExhaustion(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - testRootDir := "" - if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil { - t.Fatalf("can't make a temp rootdir: %v", err) - } else { - testRootDir = tempDir - } podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() m := NewManager( - status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), + status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker), results.NewManager(), results.NewManager(), results.NewManager(), diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index fa852c5be3d..3e1fa335a16 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -19,7 +19,6 @@ package prober import ( "context" "fmt" - "os" "testing" "time" @@ -152,14 +151,7 @@ func TestDoProbe(t *testing.T) { t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result) } - // Clean up. - testRootDir := "" - if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil { - t.Fatalf("can't make a temp rootdir: %v", err) - } else { - testRootDir = tempDir - } - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker()) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/status/fake_status_manager.go b/pkg/kubelet/status/fake_status_manager.go index 8800fb01d48..d9c3da24402 100644 --- a/pkg/kubelet/status/fake_status_manager.go +++ b/pkg/kubelet/status/fake_status_manager.go @@ -19,16 +19,11 @@ package status import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/status/state" ) type fakeManager struct { - state state.State podResizeStatuses map[types.UID]v1.PodResizeStatus } @@ -67,41 +62,10 @@ func (m *fakeManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { return } -func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { - klog.InfoS("GetContainerResourceAllocation()") - return m.state.GetContainerResourceAllocation(podUID, containerName) -} - func (m *fakeManager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus { return m.podResizeStatuses[podUID] } -func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { - allocs := m.state.GetPodResourceAllocation() - return updatePodFromAllocation(pod, allocs) -} - -func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { - klog.InfoS("SetPodAllocation()") - for _, container := range pod.Spec.Containers { - alloc := *container.Resources.DeepCopy() - if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil { - return err - } - } - if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) { - for _, container := range pod.Spec.InitContainers { - if podutil.IsRestartableInitContainer(&container) { - alloc := *container.Resources.DeepCopy() - if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil { - return err - } - } - } - } - return nil -} - func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) { m.podResizeStatuses[podUID] = resizeStatus } @@ -109,7 +73,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe // NewFakeManager creates empty/fake memory manager func NewFakeManager() Manager { return &fakeManager{ - state: state.NewStateMemory(state.PodResourceAllocation{}), podResizeStatuses: make(map[types.UID]v1.PodResizeStatus), } } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 3f50af2b4fc..323a2192ace 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -40,15 +40,11 @@ import ( "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" - "k8s.io/kubernetes/pkg/kubelet/status/state" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubeutil "k8s.io/kubernetes/pkg/kubelet/util" statusutil "k8s.io/kubernetes/pkg/util/pod" ) -// podStatusManagerStateFile is the file name where status manager stores its state -const podStatusManagerStateFile = "pod_status_manager_state" - // A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are // not sent to the API server. type versionedPodStatus struct { @@ -82,10 +78,6 @@ type manager struct { podDeletionSafety PodDeletionSafetyProvider podStartupLatencyHelper PodStartupLatencyStateHelper - // state allows to save/restore pod resource allocation and tolerate kubelet restarts. - state state.State - // stateFileDirectory holds the directory where the state file for checkpoints is held. - stateFileDirectory string } // PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. @@ -149,28 +141,12 @@ type Manager interface { // SetPodResizeStatus caches the last resizing decision for the pod. SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) - - allocationManager -} - -// TODO(tallclair): Refactor allocation state handling out of the status manager. -type allocationManager interface { - // GetContainerResourceAllocation returns the checkpointed AllocatedResources value for the container - GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) - - // UpdatePodFromAllocation overwrites the pod spec with the allocation. - // This function does a deep copy only if updates are needed. - // Returns the updated (or original) pod, and whether there was an allocation stored. - UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) - - // SetPodAllocation checkpoints the resources allocated to a pod's containers. - SetPodAllocation(pod *v1.Pod) error } const syncPeriod = 10 * time.Second // NewManager returns a functional Manager. -func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager { +func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper) Manager { return &manager{ kubeClient: kubeClient, podManager: podManager, @@ -180,7 +156,6 @@ func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeleti apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), podDeletionSafety: podDeletionSafety, podStartupLatencyHelper: podStartupLatencyHelper, - stateFileDirectory: stateFileDirectory, } } @@ -204,20 +179,6 @@ func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool { } func (m *manager) Start() { - // Initialize m.state to no-op state checkpoint manager - m.state = state.NewNoopStateCheckpoint() - - // Create pod allocation checkpoint manager even if client is nil so as to allow local get/set of AllocatedResources & Resize - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - stateImpl, err := state.NewStateCheckpoint(m.stateFileDirectory, podStatusManagerStateFile) - if err != nil { - // This is a crictical, non-recoverable failure. - klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file") - panic(err) - } - m.state = stateImpl - } - // Don't start the status manager if we don't have a client. This will happen // on the master, where the kubelet is responsible for bootstrapping the pods // of the master components. @@ -246,61 +207,6 @@ func (m *manager) Start() { }, 0) } -// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values -// If checkpoint manager has not been initialized, it returns nil, false -func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { - m.podStatusesLock.RLock() - defer m.podStatusesLock.RUnlock() - return m.state.GetContainerResourceAllocation(podUID, containerName) -} - -// UpdatePodFromAllocation overwrites the pod spec with the allocation. -// This function does a deep copy only if updates are needed. -func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { - m.podStatusesLock.RLock() - defer m.podStatusesLock.RUnlock() - // TODO(tallclair): This clones the whole cache, but we only need 1 pod. - allocs := m.state.GetPodResourceAllocation() - return updatePodFromAllocation(pod, allocs) -} - -func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) { - allocated, found := allocs[string(pod.UID)] - if !found { - return pod, false - } - - updated := false - containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) { - if cAlloc, ok := allocated[c.Name]; ok { - if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) { - // Allocation differs from pod spec, retrieve the allocation - if !updated { - // If this is the first update to be performed, copy the pod - pod = pod.DeepCopy() - updated = true - } - return cAlloc, true - } - } - return v1.ResourceRequirements{}, false - } - - for i, c := range pod.Spec.Containers { - if cAlloc, found := containerAlloc(c); found { - // Allocation differs from pod spec, update - pod.Spec.Containers[i].Resources = cAlloc - } - } - for i, c := range pod.Spec.InitContainers { - if cAlloc, found := containerAlloc(c); found { - // Allocation differs from pod spec, update - pod.Spec.InitContainers[i].Resources = cAlloc - } - } - return pod, updated -} - // GetPodResizeStatus returns the last cached ResizeStatus value. func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus { m.podStatusesLock.RLock() @@ -308,29 +214,6 @@ func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus { return m.podResizeStatuses[podUID] } -// SetPodAllocation checkpoints the resources allocated to a pod's containers -func (m *manager) SetPodAllocation(pod *v1.Pod) error { - m.podStatusesLock.RLock() - defer m.podStatusesLock.RUnlock() - - podAlloc := make(map[string]v1.ResourceRequirements) - for _, container := range pod.Spec.Containers { - alloc := *container.Resources.DeepCopy() - podAlloc[container.Name] = alloc - } - - if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) { - for _, container := range pod.Spec.InitContainers { - if podutil.IsRestartableInitContainer(&container) { - alloc := *container.Resources.DeepCopy() - podAlloc[container.Name] = alloc - } - } - } - - return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc) -} - // SetPodResizeStatus checkpoints the last resizing decision for the pod. func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) { m.podStatusesLock.Lock() @@ -806,7 +689,6 @@ func (m *manager) deletePodStatus(uid types.UID) { m.podStartupLatencyHelper.DeletePodStartupState(uid) if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { delete(m.podResizeStatuses, uid) - m.state.Delete(string(uid), "") } } @@ -820,7 +702,6 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { delete(m.podStatuses, key) if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { delete(m.podResizeStatuses, key) - m.state.Delete(string(key), "") } } } diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index d94a82741cb..dc0ed827a97 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -19,7 +19,6 @@ package status import ( "fmt" "math/rand" - "os" "reflect" "strconv" "strings" @@ -32,7 +31,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -43,7 +41,6 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - "k8s.io/kubernetes/pkg/kubelet/status/state" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util" @@ -92,13 +89,7 @@ func newTestManager(kubeClient clientset.Interface) *manager { podManager := kubepod.NewBasicPodManager() podManager.(mutablePodManager).AddPod(getTestPod()) podStartupLatencyTracker := util.NewPodStartupLatencyTracker() - testRootDir := "" - if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil { - return nil - } else { - testRootDir = tempDir - } - return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir).(*manager) + return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager) } func generateRandomMessage() string { @@ -1088,7 +1079,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { t.Run(tc.name, func(t *testing.T) { podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() - syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) + syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager) original := tc.pod.DeepCopy() syncer.SetPodStatus(original, original.Status) @@ -1174,7 +1165,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) { t.Run(name, func(t *testing.T) { podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() - syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) + syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager) pod := getTestPod() pod.Status = tc.status @@ -2036,143 +2027,6 @@ func TestMergePodStatus(t *testing.T) { } -func TestUpdatePodFromAllocation(t *testing.T) { - containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: "12345", - Name: "test", - Namespace: "default", - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "c1", - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI), - }, - }, - }, - { - Name: "c2", - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), - }, - }, - }, - }, - InitContainers: []v1.Container{ - { - Name: "c1-restartable-init", - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), - }, - }, - RestartPolicy: &containerRestartPolicyAlways, - }, - { - Name: "c1-init", - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), - }, - }, - }, - }, - }, - } - - resizedPod := pod.DeepCopy() - resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI) - resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI) - - tests := []struct { - name string - pod *v1.Pod - allocs state.PodResourceAllocation - expectPod *v1.Pod - expectUpdate bool - }{{ - name: "steady state", - pod: pod, - allocs: state.PodResourceAllocation{ - string(pod.UID): map[string]v1.ResourceRequirements{ - "c1": *pod.Spec.Containers[0].Resources.DeepCopy(), - "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), - "c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(), - "c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(), - }, - }, - expectUpdate: false, - }, { - name: "no allocations", - pod: pod, - allocs: state.PodResourceAllocation{}, - expectUpdate: false, - }, { - name: "missing container allocation", - pod: pod, - allocs: state.PodResourceAllocation{ - string(pod.UID): map[string]v1.ResourceRequirements{ - "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), - }, - }, - expectUpdate: false, - }, { - name: "resized container", - pod: pod, - allocs: state.PodResourceAllocation{ - string(pod.UID): map[string]v1.ResourceRequirements{ - "c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(), - "c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(), - "c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(), - "c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(), - }, - }, - expectUpdate: true, - expectPod: resizedPod, - }} - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - pod := test.pod.DeepCopy() - allocatedPod, updated := updatePodFromAllocation(pod, test.allocs) - - if test.expectUpdate { - assert.True(t, updated, "updated") - assert.Equal(t, test.expectPod, allocatedPod) - assert.NotEqual(t, pod, allocatedPod) - } else { - assert.False(t, updated, "updated") - assert.Same(t, pod, allocatedPod) - } - }) - } -} - func statusEqual(left, right v1.PodStatus) bool { left.Conditions = nil right.Conditions = nil