From 091b450057c3da72eae4445a7455cb66a477469f Mon Sep 17 00:00:00 2001 From: ndixita Date: Wed, 12 Mar 2025 23:45:54 +0000 Subject: [PATCH] Replace PodResourceAllocation with PodResourceInfoMap type for extensibility for pod-level resources support --- pkg/kubelet/allocation/allocation_manager.go | 25 ++--- .../allocation/allocation_manager_test.go | 40 ++++---- pkg/kubelet/allocation/state/checkpoint.go | 14 ++- pkg/kubelet/allocation/state/state.go | 39 +++++--- .../allocation/state/state_checkpoint.go | 71 +++++++------- .../allocation/state/state_checkpoint_test.go | 64 ++++++------ pkg/kubelet/allocation/state/state_mem.go | 58 ++++++----- pkg/kubelet/kubelet_test.go | 98 ++++++++++++------- 8 files changed, 232 insertions(+), 177 deletions(-) diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index dac8f2f1fd0..0444d16728f 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -103,18 +103,18 @@ func NewInMemoryManager() Manager { // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values // If checkpoint manager has not been initialized, it returns nil, false func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { - return m.allocated.GetContainerResourceAllocation(podUID, containerName) + return m.allocated.GetContainerResources(podUID, containerName) } // UpdatePodFromAllocation overwrites the pod spec with the allocation. // This function does a deep copy only if updates are needed. func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { // TODO(tallclair): This clones the whole cache, but we only need 1 pod. - allocs := m.allocated.GetPodResourceAllocation() + allocs := m.allocated.GetPodResourceInfoMap() return updatePodFromAllocation(pod, allocs) } -func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) { +func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceInfoMap) (*v1.Pod, bool) { allocated, found := allocs[pod.UID] if !found { return pod, false @@ -122,7 +122,7 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (* updated := false containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) { - if cAlloc, ok := allocated[c.Name]; ok { + if cAlloc, ok := allocated.ContainerResources[c.Name]; ok { if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) { // Allocation differs from pod spec, retrieve the allocation if !updated { @@ -153,21 +153,22 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (* // SetAllocatedResources checkpoints the resources allocated to a pod's containers func (m *manager) SetAllocatedResources(pod *v1.Pod) error { - return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod)) + return m.allocated.SetPodResourceInfo(pod.UID, allocationFromPod(pod)) } -func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements { - podAlloc := make(map[string]v1.ResourceRequirements) +func allocationFromPod(pod *v1.Pod) state.PodResourceInfo { + var podAlloc state.PodResourceInfo + podAlloc.ContainerResources = make(map[string]v1.ResourceRequirements) for _, container := range pod.Spec.Containers { alloc := *container.Resources.DeepCopy() - podAlloc[container.Name] = alloc + podAlloc.ContainerResources[container.Name] = alloc } if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) { for _, container := range pod.Spec.InitContainers { if podutil.IsRestartableInitContainer(&container) { alloc := *container.Resources.DeepCopy() - podAlloc[container.Name] = alloc + podAlloc.ContainerResources[container.Name] = alloc } } } @@ -195,12 +196,12 @@ func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { func (m *manager) SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error { if actuatedContainer == nil { alloc := allocationFromPod(allocatedPod) - return m.actuated.SetPodResourceAllocation(allocatedPod.UID, alloc) + return m.actuated.SetPodResourceInfo(allocatedPod.UID, alloc) } - return m.actuated.SetContainerResourceAllocation(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources) + return m.actuated.SetContainerResources(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources) } func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { - return m.actuated.GetContainerResourceAllocation(podUID, containerName) + return m.actuated.GetContainerResources(podUID, containerName) } diff --git a/pkg/kubelet/allocation/allocation_manager_test.go b/pkg/kubelet/allocation/allocation_manager_test.go index bec031d3732..c9cbc177ec2 100644 --- a/pkg/kubelet/allocation/allocation_manager_test.go +++ b/pkg/kubelet/allocation/allocation_manager_test.go @@ -103,44 +103,50 @@ func TestUpdatePodFromAllocation(t *testing.T) { tests := []struct { name string pod *v1.Pod - allocs state.PodResourceAllocation + allocs state.PodResourceInfoMap expectPod *v1.Pod expectUpdate bool }{{ name: "steady state", pod: pod, - allocs: state.PodResourceAllocation{ - pod.UID: map[string]v1.ResourceRequirements{ - "c1": *pod.Spec.Containers[0].Resources.DeepCopy(), - "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), - "c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(), - "c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(), + allocs: state.PodResourceInfoMap{ + pod.UID: state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + "c1": *pod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + "c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(), + "c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(), + }, }, }, expectUpdate: false, }, { name: "no allocations", pod: pod, - allocs: state.PodResourceAllocation{}, + allocs: state.PodResourceInfoMap{}, expectUpdate: false, }, { name: "missing container allocation", pod: pod, - allocs: state.PodResourceAllocation{ - pod.UID: map[string]v1.ResourceRequirements{ - "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + allocs: state.PodResourceInfoMap{ + pod.UID: state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + }, }, }, expectUpdate: false, }, { name: "resized container", pod: pod, - allocs: state.PodResourceAllocation{ - pod.UID: map[string]v1.ResourceRequirements{ - "c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(), - "c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(), - "c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(), - "c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(), + allocs: state.PodResourceInfoMap{ + pod.UID: state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + "c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(), + "c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(), + "c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(), + }, }, }, expectUpdate: true, diff --git a/pkg/kubelet/allocation/state/checkpoint.go b/pkg/kubelet/allocation/state/checkpoint.go index 34e7128b08d..b450302b02c 100644 --- a/pkg/kubelet/allocation/state/checkpoint.go +++ b/pkg/kubelet/allocation/state/checkpoint.go @@ -20,16 +20,14 @@ import ( "encoding/json" "fmt" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) var _ checkpointmanager.Checkpoint = &Checkpoint{} -type PodResourceAllocationInfo struct { - AllocationEntries map[types.UID]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` +type PodResourceCheckpointInfo struct { + Entries PodResourceInfoMap `json:"entries,omitempty"` } // Checkpoint represents a structure to store pod resource allocation checkpoint data @@ -41,7 +39,7 @@ type Checkpoint struct { } // NewCheckpoint creates a new checkpoint from a list of claim info states -func NewCheckpoint(allocations *PodResourceAllocationInfo) (*Checkpoint, error) { +func NewCheckpoint(allocations *PodResourceCheckpointInfo) (*Checkpoint, error) { serializedAllocations, err := json.Marshal(allocations) if err != nil { @@ -70,9 +68,9 @@ func (cp *Checkpoint) VerifyChecksum() error { return cp.Checksum.Verify(cp.Data) } -// GetPodResourceAllocationInfo returns Pod Resource Allocation info states from checkpoint -func (cp *Checkpoint) GetPodResourceAllocationInfo() (*PodResourceAllocationInfo, error) { - var data PodResourceAllocationInfo +// GetPodResourceCheckpointInfo returns Pod Resource Allocation info states from checkpoint +func (cp *Checkpoint) GetPodResourceCheckpointInfo() (*PodResourceCheckpointInfo, error) { + var data PodResourceCheckpointInfo if err := json.Unmarshal([]byte(cp.Data), &data); err != nil { return nil, err } diff --git a/pkg/kubelet/allocation/state/state.go b/pkg/kubelet/allocation/state/state.go index 21d240294ab..96a2421f08f 100644 --- a/pkg/kubelet/allocation/state/state.go +++ b/pkg/kubelet/allocation/state/state.go @@ -22,36 +22,45 @@ import ( "k8s.io/apimachinery/pkg/util/sets" ) -// PodResourceAllocation type is used in tracking resources allocated to pod's containers -type PodResourceAllocation map[types.UID]map[string]v1.ResourceRequirements +// PodResourceInfo stores resource requirements for containers within a pod. +type PodResourceInfo struct { + // ContainerResources maps container names to their respective ResourceRequirements. + ContainerResources map[string]v1.ResourceRequirements +} -// Clone returns a copy of PodResourceAllocation -func (pr PodResourceAllocation) Clone() PodResourceAllocation { - prCopy := make(PodResourceAllocation) - for pod := range pr { - prCopy[pod] = make(map[string]v1.ResourceRequirements) - for container, alloc := range pr[pod] { - prCopy[pod][container] = *alloc.DeepCopy() +// PodResourceInfoMap maps pod UIDs to their corresponding PodResourceInfo, +// tracking resource requirements for all containers within each pod. +type PodResourceInfoMap map[types.UID]PodResourceInfo + +// Clone returns a copy of PodResourceInfoMap +func (pr PodResourceInfoMap) Clone() PodResourceInfoMap { + prCopy := make(PodResourceInfoMap) + for podUID, podInfo := range pr { + prCopy[podUID] = PodResourceInfo{ + ContainerResources: make(map[string]v1.ResourceRequirements), + } + for containerName, containerInfo := range podInfo.ContainerResources { + prCopy[podUID].ContainerResources[containerName] = *containerInfo.DeepCopy() } } return prCopy } -// Reader interface used to read current pod resource allocation state +// Reader interface used to read current pod resource state type Reader interface { - GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) - GetPodResourceAllocation() PodResourceAllocation + GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) + GetPodResourceInfoMap() PodResourceInfoMap } type writer interface { - SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error - SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error + SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error + SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error RemovePod(podUID types.UID) error // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. RemoveOrphanedPods(remainingPods sets.Set[types.UID]) } -// State interface provides methods for tracking and setting pod resource allocation +// State interface provides methods for tracking and setting pod resources type State interface { Reader writer diff --git a/pkg/kubelet/allocation/state/state_checkpoint.go b/pkg/kubelet/allocation/state/state_checkpoint.go index df07cae051f..f6c5ce78c43 100644 --- a/pkg/kubelet/allocation/state/state_checkpoint.go +++ b/pkg/kubelet/allocation/state/state_checkpoint.go @@ -40,17 +40,17 @@ type stateCheckpoint struct { lastChecksum checksum.Checksum } -// NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend +// NewStateCheckpoint creates new State for keeping track of pod resource information with checkpoint backend func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) if err != nil { - return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) + return nil, fmt.Errorf("failed to initialize checkpoint manager for pod resource information tracking: %w", err) } pra, checksum, err := restoreState(checkpointManager, checkpointName) if err != nil { //lint:ignore ST1005 user-facing error message - return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", + return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod resource information checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName)) } @@ -64,7 +64,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { } // restores state from a checkpoint and creates it if it doesn't exist -func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceAllocation, checksum.Checksum, error) { +func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceInfoMap, checksum.Checksum, error) { checkpoint := &Checkpoint{} if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { if err == errors.ErrCheckpointNotFound { @@ -73,21 +73,21 @@ func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpo return nil, 0, err } - praInfo, err := checkpoint.GetPodResourceAllocationInfo() + praInfo, err := checkpoint.GetPodResourceCheckpointInfo() if err != nil { - return nil, 0, fmt.Errorf("failed to get pod resource allocation info: %w", err) + return nil, 0, fmt.Errorf("failed to get pod resource information: %w", err) } - klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") - return praInfo.AllocationEntries, checkpoint.Checksum, nil + klog.V(2).InfoS("State checkpoint: restored pod resource state from checkpoint") + return praInfo.Entries, checkpoint.Checksum, nil } // saves state to a checkpoint, caller is responsible for locking func (sc *stateCheckpoint) storeState() error { - podAllocation := sc.cache.GetPodResourceAllocation() + resourceInfo := sc.cache.GetPodResourceInfoMap() - checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{ - AllocationEntries: podAllocation, + checkpoint, err := NewCheckpoint(&PodResourceCheckpointInfo{ + Entries: resourceInfo, }) if err != nil { return fmt.Errorf("failed to create checkpoint: %w", err) @@ -98,47 +98,50 @@ func (sc *stateCheckpoint) storeState() error { } err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) if err != nil { - klog.ErrorS(err, "Failed to save pod allocation checkpoint") + klog.ErrorS(err, "Failed to save pod resource information checkpoint") return err } sc.lastChecksum = checkpoint.Checksum return nil } -// GetContainerResourceAllocation returns current resources allocated to a pod's container -func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { +// GetContainerResources returns current resources information to a pod's container +func (sc *stateCheckpoint) GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { sc.mux.RLock() defer sc.mux.RUnlock() - return sc.cache.GetContainerResourceAllocation(podUID, containerName) + return sc.cache.GetContainerResources(podUID, containerName) } -// GetPodResourceAllocation returns current pod resource allocation -func (sc *stateCheckpoint) GetPodResourceAllocation() PodResourceAllocation { +// GetPodResourceInfoMap returns current pod resource information +func (sc *stateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap { sc.mux.RLock() defer sc.mux.RUnlock() - return sc.cache.GetPodResourceAllocation() + return sc.cache.GetPodResourceInfoMap() } -// SetContainerResourceAllocation sets resources allocated to a pod's container -func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error { +// SetContainerResoruces sets resources information for a pod's container +func (sc *stateCheckpoint) SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error { sc.mux.Lock() defer sc.mux.Unlock() - sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc) - return sc.storeState() -} - -// SetPodResourceAllocation sets pod resource allocation -func (sc *stateCheckpoint) SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error { - sc.mux.Lock() - defer sc.mux.Unlock() - err := sc.cache.SetPodResourceAllocation(podUID, alloc) + err := sc.cache.SetContainerResources(podUID, containerName, resources) if err != nil { return err } return sc.storeState() } -// Delete deletes allocations for specified pod +// SetPodResourceInfo sets pod resource information +func (sc *stateCheckpoint) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error { + sc.mux.Lock() + defer sc.mux.Unlock() + err := sc.cache.SetPodResourceInfo(podUID, resourceInfo) + if err != nil { + return err + } + return sc.storeState() +} + +// Delete deletes resource information for specified pod func (sc *stateCheckpoint) RemovePod(podUID types.UID) error { sc.mux.Lock() defer sc.mux.Unlock() @@ -161,19 +164,19 @@ func NewNoopStateCheckpoint() State { return &noopStateCheckpoint{} } -func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ types.UID, _ string) (v1.ResourceRequirements, bool) { +func (sc *noopStateCheckpoint) GetContainerResources(_ types.UID, _ string) (v1.ResourceRequirements, bool) { return v1.ResourceRequirements{}, false } -func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation { +func (sc *noopStateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap { return nil } -func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ types.UID, _ string, _ v1.ResourceRequirements) error { +func (sc *noopStateCheckpoint) SetContainerResources(_ types.UID, _ string, _ v1.ResourceRequirements) error { return nil } -func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ types.UID, _ map[string]v1.ResourceRequirements) error { +func (sc *noopStateCheckpoint) SetPodResourceInfo(_ types.UID, _ PodResourceInfo) error { return nil } diff --git a/pkg/kubelet/allocation/state/state_checkpoint_test.go b/pkg/kubelet/allocation/state/state_checkpoint_test.go index e8c067faf9c..f409245fc40 100644 --- a/pkg/kubelet/allocation/state/state_checkpoint_test.go +++ b/pkg/kubelet/allocation/state/state_checkpoint_test.go @@ -33,7 +33,7 @@ const testCheckpoint = "pod_status_manager_state" func newTestStateCheckpoint(t *testing.T) *stateCheckpoint { testingDir := getTestDir(t) - cache := NewStateMemory(PodResourceAllocation{}) + cache := NewStateMemory(PodResourceInfoMap{}) checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir) require.NoError(t, err, "failed to create checkpoint manager") checkpointName := "pod_state_checkpoint" @@ -56,12 +56,12 @@ func getTestDir(t *testing.T) string { return testingDir } -func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceAllocation, msgAndArgs string) { - for podUID, containerResourceList := range *expected { - require.Equal(t, len(containerResourceList), len((*actual)[podUID]), msgAndArgs) - for containerName, resourceList := range containerResourceList { +func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceInfoMap, msgAndArgs string) { + for podUID, podResourceInfo := range *expected { + require.Equal(t, len(podResourceInfo.ContainerResources), len((*actual)[podUID].ContainerResources), msgAndArgs) + for containerName, resourceList := range podResourceInfo.ContainerResources { for name, quantity := range resourceList.Requests { - require.True(t, quantity.Equal((*actual)[podUID][containerName].Requests[name]), msgAndArgs) + require.True(t, quantity.Equal((*actual)[podUID].ContainerResources[containerName].Requests[name]), msgAndArgs) } } } @@ -69,7 +69,7 @@ func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceAllo func Test_stateCheckpoint_storeState(t *testing.T) { type args struct { - podResourceAllocation PodResourceAllocation + resInfoMap PodResourceInfoMap } tests := []struct { @@ -91,12 +91,14 @@ func Test_stateCheckpoint_storeState(t *testing.T) { }{ name: fmt.Sprintf("resource - %s%s", fact, suf), args: args{ - podResourceAllocation: PodResourceAllocation{ + resInfoMap: PodResourceInfoMap{ "pod1": { - "container1": { - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)), - v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)), + ContainerResources: map[string]v1.ResourceRequirements{ + "container1": { + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)), + v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)), + }, }, }, }, @@ -111,33 +113,35 @@ func Test_stateCheckpoint_storeState(t *testing.T) { originalSC, err := NewStateCheckpoint(testDir, testCheckpoint) require.NoError(t, err) - for podUID, alloc := range tt.args.podResourceAllocation { - err = originalSC.SetPodResourceAllocation(podUID, alloc) + for podUID, alloc := range tt.args.resInfoMap { + err = originalSC.SetPodResourceInfo(podUID, alloc) require.NoError(t, err) } - actual := originalSC.GetPodResourceAllocation() - verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation") + actual := originalSC.GetPodResourceInfoMap() + verifyPodResourceAllocation(t, &tt.args.resInfoMap, &actual, "stored pod resource allocation is not equal to original pod resource allocation") newSC, err := NewStateCheckpoint(testDir, testCheckpoint) require.NoError(t, err) - actual = newSC.GetPodResourceAllocation() - verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation") + actual = newSC.GetPodResourceInfoMap() + verifyPodResourceAllocation(t, &tt.args.resInfoMap, &actual, "restored pod resource allocation is not equal to original pod resource allocation") checkpointPath := filepath.Join(testDir, testCheckpoint) require.FileExists(t, checkpointPath) require.NoError(t, os.Remove(checkpointPath)) // Remove the checkpoint file to track whether it's re-written. // Setting the pod allocations to the same values should not re-write the checkpoint. - for podUID, alloc := range tt.args.podResourceAllocation { - require.NoError(t, originalSC.SetPodResourceAllocation(podUID, alloc)) + for podUID, alloc := range tt.args.resInfoMap { + require.NoError(t, originalSC.SetPodResourceInfo(podUID, alloc)) require.NoFileExists(t, checkpointPath, "checkpoint should not be re-written") } // Setting a new value should update the checkpoint. - require.NoError(t, originalSC.SetPodResourceAllocation("foo-bar", map[string]v1.ResourceRequirements{ - "container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}}, + require.NoError(t, originalSC.SetPodResourceInfo("foo-bar", PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + "container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}}, + }, })) require.FileExists(t, checkpointPath, "checkpoint should be re-written") }) @@ -153,13 +157,15 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { // prepare old checkpoint, ResizeStatusEntries is unset, // pretend that the old checkpoint is unaware for the field ResizeStatusEntries - const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}` - expectedPodResourceAllocation := PodResourceAllocation{ + const checkpointContent = `{"data":"{\"entries\":{\"pod1\":{\"ContainerResources\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}}","checksum":1178570812}` + expectedPodResourceAllocation := PodResourceInfoMap{ "pod1": { - "container1": { - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1Ki"), - v1.ResourceMemory: resource.MustParse("1Ki"), + ContainerResources: map[string]v1.ResourceRequirements{ + "container1": { + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1Ki"), + v1.ResourceMemory: resource.MustParse("1Ki"), + }, }, }, }, @@ -178,7 +184,7 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { sc.cache = NewStateMemory(actualPodResourceAllocation) - actualPodResourceAllocation = sc.cache.GetPodResourceAllocation() + actualPodResourceAllocation = sc.cache.GetPodResourceInfoMap() require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal") } diff --git a/pkg/kubelet/allocation/state/state_mem.go b/pkg/kubelet/allocation/state/state_mem.go index 5ad0e57f53e..e7e44503c64 100644 --- a/pkg/kubelet/allocation/state/state_mem.go +++ b/pkg/kubelet/allocation/state/state_mem.go @@ -27,63 +27,73 @@ import ( type stateMemory struct { sync.RWMutex - podAllocation PodResourceAllocation + podResources PodResourceInfoMap } var _ State = &stateMemory{} -// NewStateMemory creates new State to track resources allocated to pods -func NewStateMemory(alloc PodResourceAllocation) State { - if alloc == nil { - alloc = PodResourceAllocation{} +// NewStateMemory creates new State to track resources resourcesated to pods +func NewStateMemory(resources PodResourceInfoMap) State { + if resources == nil { + resources = PodResourceInfoMap{} } - klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking") + klog.V(2).InfoS("Initialized new in-memory state store for pod resource information tracking") return &stateMemory{ - podAllocation: alloc, + podResources: resources, } } -func (s *stateMemory) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { +func (s *stateMemory) GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { s.RLock() defer s.RUnlock() - alloc, ok := s.podAllocation[podUID][containerName] - return *alloc.DeepCopy(), ok + resourceInfo, ok := s.podResources[podUID] + if !ok { + return v1.ResourceRequirements{}, ok + } + + resources, ok := resourceInfo.ContainerResources[containerName] + if !ok { + return v1.ResourceRequirements{}, ok + } + return *resources.DeepCopy(), ok } -func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation { +func (s *stateMemory) GetPodResourceInfoMap() PodResourceInfoMap { s.RLock() defer s.RUnlock() - return s.podAllocation.Clone() + return s.podResources.Clone() } -func (s *stateMemory) SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error { +func (s *stateMemory) SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error { s.Lock() defer s.Unlock() - if _, ok := s.podAllocation[podUID]; !ok { - s.podAllocation[podUID] = make(map[string]v1.ResourceRequirements) + if _, ok := s.podResources[podUID]; !ok { + s.podResources[podUID] = PodResourceInfo{ + ContainerResources: make(map[string]v1.ResourceRequirements), + } } - s.podAllocation[podUID][containerName] = alloc - klog.V(3).InfoS("Updated container resource allocation", "podUID", podUID, "containerName", containerName, "alloc", alloc) + s.podResources[podUID].ContainerResources[containerName] = resources + klog.V(3).InfoS("Updated container resource information", "podUID", podUID, "containerName", containerName, "resources", resources) return nil } -func (s *stateMemory) SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error { +func (s *stateMemory) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error { s.Lock() defer s.Unlock() - s.podAllocation[podUID] = alloc - klog.V(3).InfoS("Updated pod resource allocation", "podUID", podUID, "allocation", alloc) + s.podResources[podUID] = resourceInfo + klog.V(3).InfoS("Updated pod resource information", "podUID", podUID, "information", resourceInfo) return nil } func (s *stateMemory) RemovePod(podUID types.UID) error { s.Lock() defer s.Unlock() - delete(s.podAllocation, podUID) - klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID) + delete(s.podResources, podUID) + klog.V(3).InfoS("Deleted pod resource information", "podUID", podUID) return nil } @@ -91,9 +101,9 @@ func (s *stateMemory) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { s.Lock() defer s.Unlock() - for podUID := range s.podAllocation { + for podUID := range s.podResources { if _, ok := remainingPods[types.UID(podUID)]; !ok { - delete(s.podAllocation, podUID) + delete(s.podResources, podUID) } } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 758df2b0bc0..c8e3500d6d6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2458,17 +2458,19 @@ func TestPodResourceAllocationReset(t *testing.T) { emptyPodSpec.Containers[0].Resources.Requests = v1.ResourceList{} tests := []struct { - name string - pod *v1.Pod - existingPodAllocation *v1.Pod - expectedPodResourceAllocation state.PodResourceAllocation + name string + pod *v1.Pod + existingPodAllocation *v1.Pod + expectedPodResourceInfoMap state.PodResourceInfoMap }{ { name: "Having both memory and cpu, resource allocation not exists", pod: podWithUIDNameNsSpec("1", "pod1", "foo", *cpu500mMem500MPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "1": map[string]v1.ResourceRequirements{ - cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "1": { + ContainerResources: map[string]v1.ResourceRequirements{ + cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2476,9 +2478,11 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Having both memory and cpu, resource allocation exists", pod: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "2": map[string]v1.ResourceRequirements{ - cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "2": { + ContainerResources: map[string]v1.ResourceRequirements{ + cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2486,18 +2490,22 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Having both memory and cpu, resource allocation exists (with different value)", pod: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu500mMem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu800mMem800MPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "3": map[string]v1.ResourceRequirements{ - cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "3": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources, + }, }, }, }, { name: "Only has cpu, resource allocation not exists", pod: podWithUIDNameNsSpec("4", "pod5", "foo", *cpu500mPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "4": map[string]v1.ResourceRequirements{ - cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "4": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2505,9 +2513,11 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has cpu, resource allocation exists", pod: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec), existingPodAllocation: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "5": map[string]v1.ResourceRequirements{ - cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "5": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2515,18 +2525,22 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has cpu, resource allocation exists (with different value)", pod: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu500mPodSpec), existingPodAllocation: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu800mPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "6": map[string]v1.ResourceRequirements{ - cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "6": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources, + }, }, }, }, { name: "Only has memory, resource allocation not exists", pod: podWithUIDNameNsSpec("7", "pod7", "foo", *mem500MPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "7": map[string]v1.ResourceRequirements{ - mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "7": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2534,9 +2548,11 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has memory, resource allocation exists", pod: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "8": map[string]v1.ResourceRequirements{ - mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "8": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2544,18 +2560,22 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has memory, resource allocation exists (with different value)", pod: podWithUIDNameNsSpec("9", "pod9", "foo", *mem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("9", "pod9", "foo", *mem800MPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "9": map[string]v1.ResourceRequirements{ - mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "9": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources, + }, }, }, }, { name: "No CPU and memory, resource allocation not exists", pod: podWithUIDNameNsSpec("10", "pod10", "foo", *emptyPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "10": map[string]v1.ResourceRequirements{ - emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "10": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2563,9 +2583,11 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "No CPU and memory, resource allocation exists", pod: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec), existingPodAllocation: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec), - expectedPodResourceAllocation: state.PodResourceAllocation{ - "11": map[string]v1.ResourceRequirements{ - emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, + expectedPodResourceInfoMap: state.PodResourceInfoMap{ + "11": state.PodResourceInfo{ + ContainerResources: map[string]v1.ResourceRequirements{ + emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, + }, }, }, }, @@ -2585,7 +2607,7 @@ func TestPodResourceAllocationReset(t *testing.T) { if !found { t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) } - assert.Equal(t, tc.expectedPodResourceAllocation[tc.pod.UID][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name) + assert.Equal(t, tc.expectedPodResourceInfoMap[tc.pod.UID].ContainerResources[tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name) }) } }