Change allocation manager pod UID to types.UID

This commit is contained in:
Tim Allclair 2025-03-04 13:54:27 -08:00
parent 6c445ca18a
commit 8fac9c68e1
9 changed files with 36 additions and 37 deletions

View File

@ -34,7 +34,7 @@ const podStatusManagerStateFile = "pod_status_manager_state"
// AllocationManager tracks pod resource allocations. // AllocationManager tracks pod resource allocations.
type Manager interface { type Manager interface {
// GetContainerResourceAllocation returns the AllocatedResources value for the container // GetContainerResourceAllocation returns the AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
// UpdatePodFromAllocation overwrites the pod spec with the allocation. // UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed. // This function does a deep copy only if updates are needed.
@ -83,7 +83,7 @@ func NewInMemoryManager() Manager {
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
// If checkpoint manager has not been initialized, it returns nil, false // If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
return m.state.GetContainerResourceAllocation(podUID, containerName) return m.state.GetContainerResourceAllocation(podUID, containerName)
} }
@ -96,7 +96,7 @@ func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
} }
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) { func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
allocated, found := allocs[string(pod.UID)] allocated, found := allocs[pod.UID]
if !found { if !found {
return pod, false return pod, false
} }
@ -149,11 +149,11 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
} }
} }
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc) return m.state.SetPodResourceAllocation(pod.UID, podAlloc)
} }
func (m *manager) DeletePodAllocation(uid types.UID) { func (m *manager) DeletePodAllocation(uid types.UID) {
if err := m.state.Delete(string(uid), ""); err != nil { if err := m.state.Delete(uid, ""); err != nil {
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
} }

View File

@ -110,7 +110,7 @@ func TestUpdatePodFromAllocation(t *testing.T) {
name: "steady state", name: "steady state",
pod: pod, pod: pod,
allocs: state.PodResourceAllocation{ allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{ pod.UID: map[string]v1.ResourceRequirements{
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(), "c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(), "c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(), "c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
@ -127,7 +127,7 @@ func TestUpdatePodFromAllocation(t *testing.T) {
name: "missing container allocation", name: "missing container allocation",
pod: pod, pod: pod,
allocs: state.PodResourceAllocation{ allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{ pod.UID: map[string]v1.ResourceRequirements{
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(), "c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
}, },
}, },
@ -136,7 +136,7 @@ func TestUpdatePodFromAllocation(t *testing.T) {
name: "resized container", name: "resized container",
pod: pod, pod: pod,
allocs: state.PodResourceAllocation{ allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{ pod.UID: map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(), "c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(), "c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(), "c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
) )
@ -28,7 +29,7 @@ import (
var _ checkpointmanager.Checkpoint = &Checkpoint{} var _ checkpointmanager.Checkpoint = &Checkpoint{}
type PodResourceAllocationInfo struct { type PodResourceAllocationInfo struct {
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` AllocationEntries map[types.UID]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
} }
// Checkpoint represents a structure to store pod resource allocation checkpoint data // Checkpoint represents a structure to store pod resource allocation checkpoint data

View File

@ -23,10 +23,7 @@ import (
) )
// PodResourceAllocation type is used in tracking resources allocated to pod's containers // PodResourceAllocation type is used in tracking resources allocated to pod's containers
type PodResourceAllocation map[string]map[string]v1.ResourceRequirements type PodResourceAllocation map[types.UID]map[string]v1.ResourceRequirements
// PodResizeStatus type is used in tracking the last resize decision for pod
type PodResizeStatus map[string]v1.PodResizeStatus
// Clone returns a copy of PodResourceAllocation // Clone returns a copy of PodResourceAllocation
func (pr PodResourceAllocation) Clone() PodResourceAllocation { func (pr PodResourceAllocation) Clone() PodResourceAllocation {
@ -42,14 +39,14 @@ func (pr PodResourceAllocation) Clone() PodResourceAllocation {
// Reader interface used to read current pod resource allocation state // Reader interface used to read current pod resource allocation state
type Reader interface { type Reader interface {
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
GetPodResourceAllocation() PodResourceAllocation GetPodResourceAllocation() PodResourceAllocation
} }
type writer interface { type writer interface {
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error
Delete(podUID string, containerName string) error Delete(podUID types.UID, containerName string) error
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID]) RemoveOrphanedPods(remainingPods sets.Set[types.UID])
} }

View File

@ -68,7 +68,7 @@ func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpo
if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
return &PodResourceAllocationInfo{ return &PodResourceAllocationInfo{
AllocationEntries: make(map[string]map[string]v1.ResourceRequirements), AllocationEntries: make(map[types.UID]map[string]v1.ResourceRequirements),
}, nil }, nil
} }
return nil, err return nil, err
@ -101,7 +101,7 @@ func (sc *stateCheckpoint) storeState() error {
} }
// GetContainerResourceAllocation returns current resources allocated to a pod's container // GetContainerResourceAllocation returns current resources allocated to a pod's container
func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
sc.mux.RLock() sc.mux.RLock()
defer sc.mux.RUnlock() defer sc.mux.RUnlock()
return sc.cache.GetContainerResourceAllocation(podUID, containerName) return sc.cache.GetContainerResourceAllocation(podUID, containerName)
@ -115,7 +115,7 @@ func (sc *stateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
} }
// SetContainerResourceAllocation sets resources allocated to a pod's container // SetContainerResourceAllocation sets resources allocated to a pod's container
func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc) sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc)
@ -123,7 +123,7 @@ func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, contain
} }
// SetPodResourceAllocation sets pod resource allocation // SetPodResourceAllocation sets pod resource allocation
func (sc *stateCheckpoint) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error { func (sc *stateCheckpoint) SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
err := sc.cache.SetPodResourceAllocation(podUID, alloc) err := sc.cache.SetPodResourceAllocation(podUID, alloc)
@ -134,7 +134,7 @@ func (sc *stateCheckpoint) SetPodResourceAllocation(podUID string, alloc map[str
} }
// Delete deletes allocations for specified pod // Delete deletes allocations for specified pod
func (sc *stateCheckpoint) Delete(podUID string, containerName string) error { func (sc *stateCheckpoint) Delete(podUID types.UID, containerName string) error {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.Delete(podUID, containerName) sc.cache.Delete(podUID, containerName)
@ -154,7 +154,7 @@ func NewNoopStateCheckpoint() State {
return &noopStateCheckpoint{} return &noopStateCheckpoint{}
} }
func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ string, _ string) (v1.ResourceRequirements, bool) { func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ types.UID, _ string) (v1.ResourceRequirements, bool) {
return v1.ResourceRequirements{}, false return v1.ResourceRequirements{}, false
} }
@ -162,15 +162,15 @@ func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation
return nil return nil
} }
func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error { func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ types.UID, _ string, _ v1.ResourceRequirements) error {
return nil return nil
} }
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v1.ResourceRequirements) error { func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ types.UID, _ map[string]v1.ResourceRequirements) error {
return nil return nil
} }
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error { func (sc *noopStateCheckpoint) Delete(_ types.UID, _ string) error {
return nil return nil
} }

View File

@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
) )
@ -138,7 +139,7 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
// pretend that the old checkpoint is unaware for the field ResizeStatusEntries // pretend that the old checkpoint is unaware for the field ResizeStatusEntries
const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}` const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}`
expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{ expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{
AllocationEntries: map[string]map[string]v1.ResourceRequirements{ AllocationEntries: map[types.UID]map[string]v1.ResourceRequirements{
"pod1": { "pod1": {
"container1": { "container1": {
Requests: v1.ResourceList{ Requests: v1.ResourceList{

View File

@ -43,7 +43,7 @@ func NewStateMemory(alloc PodResourceAllocation) State {
} }
} }
func (s *stateMemory) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { func (s *stateMemory) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
@ -57,7 +57,7 @@ func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation {
return s.podAllocation.Clone() return s.podAllocation.Clone()
} }
func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { func (s *stateMemory) SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -70,7 +70,7 @@ func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerNam
return nil return nil
} }
func (s *stateMemory) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error { func (s *stateMemory) SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -79,7 +79,7 @@ func (s *stateMemory) SetPodResourceAllocation(podUID string, alloc map[string]v
return nil return nil
} }
func (s *stateMemory) deleteContainer(podUID string, containerName string) { func (s *stateMemory) deleteContainer(podUID types.UID, containerName string) {
delete(s.podAllocation[podUID], containerName) delete(s.podAllocation[podUID], containerName)
if len(s.podAllocation[podUID]) == 0 { if len(s.podAllocation[podUID]) == 0 {
delete(s.podAllocation, podUID) delete(s.podAllocation, podUID)
@ -87,7 +87,7 @@ func (s *stateMemory) deleteContainer(podUID string, containerName string) {
klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID, "containerName", containerName) klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID, "containerName", containerName)
} }
func (s *stateMemory) Delete(podUID string, containerName string) error { func (s *stateMemory) Delete(podUID types.UID, containerName string) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if len(containerName) == 0 { if len(containerName) == 0 {

View File

@ -2153,7 +2153,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
// Always set the status to the latest allocated resources, even if it differs from the // Always set the status to the latest allocated resources, even if it differs from the
// allocation used by the current sync loop. // allocation used by the current sync loop.
alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName) alloc, found := kl.allocationManager.GetContainerResourceAllocation(pod.UID, cName)
if !found { if !found {
// This case is expected for non-resizable containers (ephemeral & non-restartable init containers). // This case is expected for non-resizable containers (ephemeral & non-restartable init containers).
// Don't set status.Resources in this case. // Don't set status.Resources in this case.
@ -2373,7 +2373,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
status.Resources = convertContainerStatusResources(cName, status, cStatus, oldStatuses) status.Resources = convertContainerStatusResources(cName, status, cStatus, oldStatuses)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingAllocatedStatus) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingAllocatedStatus) {
if alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName); found { if alloc, found := kl.allocationManager.GetContainerResourceAllocation(pod.UID, cName); found {
status.AllocatedResources = alloc.Requests status.AllocatedResources = alloc.Requests
} }
} }

View File

@ -2575,11 +2575,11 @@ func TestPodResourceAllocationReset(t *testing.T) {
} }
kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(tc.pod.UID, tc.pod.Spec.Containers[0].Name)
if !found { if !found {
t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
} }
assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name) assert.Equal(t, tc.expectedPodResourceAllocation[tc.pod.UID][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name)
}) })
} }
} }
@ -2953,7 +2953,7 @@ func TestHandlePodResourcesResize(t *testing.T) {
assert.Equal(t, tt.expectedAllocatedReqs, updatedPodCtr.Resources.Requests, "updated pod spec requests") assert.Equal(t, tt.expectedAllocatedReqs, updatedPodCtr.Resources.Requests, "updated pod spec requests")
assert.Equal(t, tt.expectedAllocatedLims, updatedPodCtr.Resources.Limits, "updated pod spec limits") assert.Equal(t, tt.expectedAllocatedLims, updatedPodCtr.Resources.Limits, "updated pod spec limits")
alloc, found := kubelet.allocationManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name) alloc, found := kubelet.allocationManager.GetContainerResourceAllocation(newPod.UID, updatedPodCtr.Name)
require.True(t, found, "container allocation") require.True(t, found, "container allocation")
assert.Equal(t, tt.expectedAllocatedReqs, alloc.Requests, "stored container request allocation") assert.Equal(t, tt.expectedAllocatedReqs, alloc.Requests, "stored container request allocation")
assert.Equal(t, tt.expectedAllocatedLims, alloc.Limits, "stored container limit allocation") assert.Equal(t, tt.expectedAllocatedLims, alloc.Limits, "stored container limit allocation")