From 53aa7277088cc938ef575fcbbc73103c5004a562 Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Thu, 17 Oct 2024 22:28:53 -0700 Subject: [PATCH] Checkpoint allocated requests and limits --- pkg/kubelet/kubelet.go | 11 +-- pkg/kubelet/kubelet_pods.go | 7 +- pkg/kubelet/kubelet_test.go | 70 ++++++++++---------- pkg/kubelet/status/fake_status_manager.go | 6 +- pkg/kubelet/status/state/checkpoint.go | 10 +-- pkg/kubelet/status/state/state.go | 12 ++-- pkg/kubelet/status/state/state_checkpoint.go | 14 ++-- pkg/kubelet/status/state/state_mem.go | 10 +-- pkg/kubelet/status/status_manager.go | 9 +-- 9 files changed, 72 insertions(+), 77 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 01a723987f8..7201d7ea4df 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2612,15 +2612,10 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // updateContainerResourceAllocation updates AllocatedResources values // (for cpu & memory) from checkpoint store func (kl *Kubelet) updateContainerResourceAllocation(pod *v1.Pod) { - for _, c := range pod.Spec.Containers { + for i, c := range pod.Spec.Containers { allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name) - if c.Resources.Requests != nil && found { - if _, ok := allocatedResources[v1.ResourceCPU]; ok { - c.Resources.Requests[v1.ResourceCPU] = allocatedResources[v1.ResourceCPU] - } - if _, ok := allocatedResources[v1.ResourceMemory]; ok { - c.Resources.Requests[v1.ResourceMemory] = allocatedResources[v1.ResourceMemory] - } + if found { + pod.Spec.Containers[i].Resources = allocatedResources } } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index ff872985ce5..c2cf2acc0be 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -2082,9 +2082,10 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon } container := kubecontainer.GetContainerSpec(pod, cName) // AllocatedResources values come from checkpoint. It is the source-of-truth. - found := false - status.AllocatedResources, found = kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) - if !(container.Resources.Requests == nil && container.Resources.Limits == nil) && !found { + alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) + if found { + status.AllocatedResources = alloc.Requests + } else if !(container.Resources.Requests == nil && container.Resources.Limits == nil) { // Log error and fallback to AllocatedResources in oldStatus if it exists klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName) if oldStatusFound { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 27c47ed9015..05ddb14feba 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2447,8 +2447,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Having both memory and cpu, resource allocation not exists", pod: podWithUIDNameNsSpec("1", "pod1", "foo", *cpu500mMem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "1": map[string]v1.ResourceList{ - cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests, + "1": map[string]v1.ResourceRequirements{ + cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2457,8 +2457,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "2": map[string]v1.ResourceList{ - cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests, + "2": map[string]v1.ResourceRequirements{ + cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2467,8 +2467,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu500mMem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu800mMem800MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "3": map[string]v1.ResourceList{ - cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources.Requests, + "3": map[string]v1.ResourceRequirements{ + cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources, }, }, }, @@ -2476,8 +2476,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has cpu, resource allocation not exists", pod: podWithUIDNameNsSpec("4", "pod5", "foo", *cpu500mPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "4": map[string]v1.ResourceList{ - cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests, + "4": map[string]v1.ResourceRequirements{ + cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, }, }, }, @@ -2486,8 +2486,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec), existingPodAllocation: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "5": map[string]v1.ResourceList{ - cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests, + "5": map[string]v1.ResourceRequirements{ + cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, }, }, }, @@ -2496,8 +2496,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu500mPodSpec), existingPodAllocation: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu800mPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "6": map[string]v1.ResourceList{ - cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources.Requests, + "6": map[string]v1.ResourceRequirements{ + cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources, }, }, }, @@ -2505,8 +2505,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has memory, resource allocation not exists", pod: podWithUIDNameNsSpec("7", "pod7", "foo", *mem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "7": map[string]v1.ResourceList{ - mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests, + "7": map[string]v1.ResourceRequirements{ + mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2515,8 +2515,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "8": map[string]v1.ResourceList{ - mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests, + "8": map[string]v1.ResourceRequirements{ + mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2525,8 +2525,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("9", "pod9", "foo", *mem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("9", "pod9", "foo", *mem800MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "9": map[string]v1.ResourceList{ - mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources.Requests, + "9": map[string]v1.ResourceRequirements{ + mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources, }, }, }, @@ -2534,8 +2534,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "No CPU and memory, resource allocation not exists", pod: podWithUIDNameNsSpec("10", "pod10", "foo", *emptyPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "10": map[string]v1.ResourceList{ - emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests, + "10": map[string]v1.ResourceRequirements{ + emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, }, }, }, @@ -2544,27 +2544,29 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec), existingPodAllocation: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "11": map[string]v1.ResourceList{ - emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests, + "11": map[string]v1.ResourceRequirements{ + emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, }, }, }, } for _, tc := range tests { - if tc.existingPodAllocation != nil { - // when kubelet restarts, AllocatedResources has already existed before adding pod - err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation) - if err != nil { - t.Fatalf("failed to set pod allocation: %v", err) + t.Run(tc.name, func(t *testing.T) { + if tc.existingPodAllocation != nil { + // when kubelet restarts, AllocatedResources has already existed before adding pod + err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation) + if err != nil { + t.Fatalf("failed to set pod allocation: %v", err) + } } - } - kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) + kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) - allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) - if !found { - t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) - } - assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name) + allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) + if !found { + t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) + } + assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name) + }) } } diff --git a/pkg/kubelet/status/fake_status_manager.go b/pkg/kubelet/status/fake_status_manager.go index 42cd611984e..c421883e176 100644 --- a/pkg/kubelet/status/fake_status_manager.go +++ b/pkg/kubelet/status/fake_status_manager.go @@ -63,7 +63,7 @@ func (m *fakeManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { return } -func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { klog.InfoS("GetContainerResourceAllocation()") return m.state.GetContainerResourceAllocation(podUID, containerName) } @@ -76,9 +76,9 @@ func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, boo func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { klog.InfoS("SetPodAllocation()") for _, container := range pod.Spec.Containers { - var alloc v1.ResourceList + var alloc v1.ResourceRequirements if container.Resources.Requests != nil { - alloc = container.Resources.Requests.DeepCopy() + alloc = *container.Resources.DeepCopy() } m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc) } diff --git a/pkg/kubelet/status/state/checkpoint.go b/pkg/kubelet/status/state/checkpoint.go index 6cad6361e28..525bf675860 100644 --- a/pkg/kubelet/status/state/checkpoint.go +++ b/pkg/kubelet/status/state/checkpoint.go @@ -19,7 +19,7 @@ package state import ( "encoding/json" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) @@ -28,16 +28,16 @@ var _ checkpointmanager.Checkpoint = &PodResourceAllocationCheckpoint{} // PodResourceAllocationCheckpoint is used to store resources allocated to a pod in checkpoint type PodResourceAllocationCheckpoint struct { - AllocationEntries map[string]map[string]v1.ResourceList `json:"allocationEntries,omitempty"` - ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"` - Checksum checksum.Checksum `json:"checksum"` + AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` + ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"` + Checksum checksum.Checksum `json:"checksum"` } // NewPodResourceAllocationCheckpoint returns an instance of Checkpoint func NewPodResourceAllocationCheckpoint() *PodResourceAllocationCheckpoint { //lint:ignore unexported-type-in-api user-facing error message return &PodResourceAllocationCheckpoint{ - AllocationEntries: make(map[string]map[string]v1.ResourceList), + AllocationEntries: make(map[string]map[string]v1.ResourceRequirements), ResizeStatusEntries: make(map[string]v1.PodResizeStatus), } } diff --git a/pkg/kubelet/status/state/state.go b/pkg/kubelet/status/state/state.go index 2fdbe8a4474..4b85f164ab8 100644 --- a/pkg/kubelet/status/state/state.go +++ b/pkg/kubelet/status/state/state.go @@ -17,11 +17,11 @@ limitations under the License. package state import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) // PodResourceAllocation type is used in tracking resources allocated to pod's containers -type PodResourceAllocation map[string]map[string]v1.ResourceList +type PodResourceAllocation map[string]map[string]v1.ResourceRequirements // PodResizeStatus type is used in tracking the last resize decision for pod type PodResizeStatus map[string]v1.PodResizeStatus @@ -30,9 +30,9 @@ type PodResizeStatus map[string]v1.PodResizeStatus func (pr PodResourceAllocation) Clone() PodResourceAllocation { prCopy := make(PodResourceAllocation) for pod := range pr { - prCopy[pod] = make(map[string]v1.ResourceList) + prCopy[pod] = make(map[string]v1.ResourceRequirements) for container, alloc := range pr[pod] { - prCopy[pod][container] = alloc.DeepCopy() + prCopy[pod][container] = *alloc.DeepCopy() } } return prCopy @@ -40,14 +40,14 @@ func (pr PodResourceAllocation) Clone() PodResourceAllocation { // Reader interface used to read current pod resource allocation state type Reader interface { - GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) + GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) GetPodResourceAllocation() PodResourceAllocation GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) GetResizeStatus() PodResizeStatus } type writer interface { - SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error + SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetPodResourceAllocation(PodResourceAllocation) error SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error SetResizeStatus(PodResizeStatus) error diff --git a/pkg/kubelet/status/state/state_checkpoint.go b/pkg/kubelet/status/state/state_checkpoint.go index f0f5b2b7696..3230b2adcc4 100644 --- a/pkg/kubelet/status/state/state_checkpoint.go +++ b/pkg/kubelet/status/state/state_checkpoint.go @@ -21,7 +21,7 @@ import ( "path" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" @@ -82,7 +82,7 @@ func (sc *stateCheckpoint) storeState() error { podAllocation := sc.cache.GetPodResourceAllocation() for pod := range podAllocation { - checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceList) + checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceRequirements) for container, alloc := range podAllocation[pod] { checkpoint.AllocationEntries[pod][container] = alloc } @@ -103,7 +103,7 @@ func (sc *stateCheckpoint) storeState() error { } // GetContainerResourceAllocation returns current resources allocated to a pod's container -func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { sc.mux.RLock() defer sc.mux.RUnlock() return sc.cache.GetContainerResourceAllocation(podUID, containerName) @@ -131,7 +131,7 @@ func (sc *stateCheckpoint) GetResizeStatus() PodResizeStatus { } // SetContainerResourceAllocation sets resources allocated to a pod's container -func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error { +func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc) @@ -185,8 +185,8 @@ func NewNoopStateCheckpoint() State { return &noopStateCheckpoint{} } -func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ string, _ string) (v1.ResourceList, bool) { - return nil, false +func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ string, _ string) (v1.ResourceRequirements, bool) { + return v1.ResourceRequirements{}, false } func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation { @@ -201,7 +201,7 @@ func (sc *noopStateCheckpoint) GetResizeStatus() PodResizeStatus { return nil } -func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceList) error { +func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error { return nil } diff --git a/pkg/kubelet/status/state/state_mem.go b/pkg/kubelet/status/state/state_mem.go index 6a4047b1739..8e90c06f554 100644 --- a/pkg/kubelet/status/state/state_mem.go +++ b/pkg/kubelet/status/state/state_mem.go @@ -19,7 +19,7 @@ package state import ( "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) @@ -40,12 +40,12 @@ func NewStateMemory() State { } } -func (s *stateMemory) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (s *stateMemory) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { s.RLock() defer s.RUnlock() alloc, ok := s.podAllocation[podUID][containerName] - return alloc.DeepCopy(), ok + return *alloc.DeepCopy(), ok } func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation { @@ -72,12 +72,12 @@ func (s *stateMemory) GetResizeStatus() PodResizeStatus { return prs } -func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error { +func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { s.Lock() defer s.Unlock() if _, ok := s.podAllocation[podUID]; !ok { - s.podAllocation[podUID] = make(map[string]v1.ResourceList) + s.podAllocation[podUID] = make(map[string]v1.ResourceRequirements) } s.podAllocation[podUID][containerName] = alloc diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index c7cb740586e..1e23838b9cf 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -144,7 +144,7 @@ type Manager interface { RemoveOrphanedStatuses(podUIDs map[types.UID]bool) // GetContainerResourceAllocation returns checkpointed AllocatedResources value for the container - GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) + GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) // GetPodResizeStatus returns checkpointed PodStatus.Resize value GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) @@ -236,7 +236,7 @@ func (m *manager) Start() { // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values // If checkpoint manager has not been initialized, it returns nil, false -func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() return m.state.GetContainerResourceAllocation(podUID, containerName) @@ -255,10 +255,7 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() for _, container := range pod.Spec.Containers { - var alloc v1.ResourceList - if container.Resources.Requests != nil { - alloc = container.Resources.Requests.DeepCopy() - } + alloc := *container.Resources.DeepCopy() if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil { return err }