Replace PodResourceAllocation with PodResourceInfoMap type for

extensibility for pod-level resources support
This commit is contained in:
ndixita 2025-03-12 23:45:54 +00:00
parent 473ec01548
commit 091b450057
8 changed files with 232 additions and 177 deletions

View File

@ -103,18 +103,18 @@ func NewInMemoryManager() Manager {
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
return m.allocated.GetContainerResourceAllocation(podUID, containerName)
return m.allocated.GetContainerResources(podUID, containerName)
}
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
allocs := m.allocated.GetPodResourceAllocation()
allocs := m.allocated.GetPodResourceInfoMap()
return updatePodFromAllocation(pod, allocs)
}
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceInfoMap) (*v1.Pod, bool) {
allocated, found := allocs[pod.UID]
if !found {
return pod, false
@ -122,7 +122,7 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*
updated := false
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
if cAlloc, ok := allocated[c.Name]; ok {
if cAlloc, ok := allocated.ContainerResources[c.Name]; ok {
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
// Allocation differs from pod spec, retrieve the allocation
if !updated {
@ -153,21 +153,22 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*
// SetAllocatedResources checkpoints the resources allocated to a pod's containers
func (m *manager) SetAllocatedResources(pod *v1.Pod) error {
return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod))
return m.allocated.SetPodResourceInfo(pod.UID, allocationFromPod(pod))
}
func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements {
podAlloc := make(map[string]v1.ResourceRequirements)
func allocationFromPod(pod *v1.Pod) state.PodResourceInfo {
var podAlloc state.PodResourceInfo
podAlloc.ContainerResources = make(map[string]v1.ResourceRequirements)
for _, container := range pod.Spec.Containers {
alloc := *container.Resources.DeepCopy()
podAlloc[container.Name] = alloc
podAlloc.ContainerResources[container.Name] = alloc
}
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
for _, container := range pod.Spec.InitContainers {
if podutil.IsRestartableInitContainer(&container) {
alloc := *container.Resources.DeepCopy()
podAlloc[container.Name] = alloc
podAlloc.ContainerResources[container.Name] = alloc
}
}
}
@ -195,12 +196,12 @@ func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
func (m *manager) SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error {
if actuatedContainer == nil {
alloc := allocationFromPod(allocatedPod)
return m.actuated.SetPodResourceAllocation(allocatedPod.UID, alloc)
return m.actuated.SetPodResourceInfo(allocatedPod.UID, alloc)
}
return m.actuated.SetContainerResourceAllocation(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources)
return m.actuated.SetContainerResources(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources)
}
func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
return m.actuated.GetContainerResourceAllocation(podUID, containerName)
return m.actuated.GetContainerResources(podUID, containerName)
}

View File

@ -103,44 +103,50 @@ func TestUpdatePodFromAllocation(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
allocs state.PodResourceAllocation
allocs state.PodResourceInfoMap
expectPod *v1.Pod
expectUpdate bool
}{{
name: "steady state",
pod: pod,
allocs: state.PodResourceAllocation{
pod.UID: map[string]v1.ResourceRequirements{
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
allocs: state.PodResourceInfoMap{
pod.UID: state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
},
},
},
expectUpdate: false,
}, {
name: "no allocations",
pod: pod,
allocs: state.PodResourceAllocation{},
allocs: state.PodResourceInfoMap{},
expectUpdate: false,
}, {
name: "missing container allocation",
pod: pod,
allocs: state.PodResourceAllocation{
pod.UID: map[string]v1.ResourceRequirements{
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
allocs: state.PodResourceInfoMap{
pod.UID: state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
},
},
},
expectUpdate: false,
}, {
name: "resized container",
pod: pod,
allocs: state.PodResourceAllocation{
pod.UID: map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
allocs: state.PodResourceInfoMap{
pod.UID: state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
},
},
},
expectUpdate: true,

View File

@ -20,16 +20,14 @@ import (
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
var _ checkpointmanager.Checkpoint = &Checkpoint{}
type PodResourceAllocationInfo struct {
AllocationEntries map[types.UID]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
type PodResourceCheckpointInfo struct {
Entries PodResourceInfoMap `json:"entries,omitempty"`
}
// Checkpoint represents a structure to store pod resource allocation checkpoint data
@ -41,7 +39,7 @@ type Checkpoint struct {
}
// NewCheckpoint creates a new checkpoint from a list of claim info states
func NewCheckpoint(allocations *PodResourceAllocationInfo) (*Checkpoint, error) {
func NewCheckpoint(allocations *PodResourceCheckpointInfo) (*Checkpoint, error) {
serializedAllocations, err := json.Marshal(allocations)
if err != nil {
@ -70,9 +68,9 @@ func (cp *Checkpoint) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}
// GetPodResourceAllocationInfo returns Pod Resource Allocation info states from checkpoint
func (cp *Checkpoint) GetPodResourceAllocationInfo() (*PodResourceAllocationInfo, error) {
var data PodResourceAllocationInfo
// GetPodResourceCheckpointInfo returns Pod Resource Allocation info states from checkpoint
func (cp *Checkpoint) GetPodResourceCheckpointInfo() (*PodResourceCheckpointInfo, error) {
var data PodResourceCheckpointInfo
if err := json.Unmarshal([]byte(cp.Data), &data); err != nil {
return nil, err
}

View File

@ -22,36 +22,45 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
type PodResourceAllocation map[types.UID]map[string]v1.ResourceRequirements
// PodResourceInfo stores resource requirements for containers within a pod.
type PodResourceInfo struct {
// ContainerResources maps container names to their respective ResourceRequirements.
ContainerResources map[string]v1.ResourceRequirements
}
// Clone returns a copy of PodResourceAllocation
func (pr PodResourceAllocation) Clone() PodResourceAllocation {
prCopy := make(PodResourceAllocation)
for pod := range pr {
prCopy[pod] = make(map[string]v1.ResourceRequirements)
for container, alloc := range pr[pod] {
prCopy[pod][container] = *alloc.DeepCopy()
// PodResourceInfoMap maps pod UIDs to their corresponding PodResourceInfo,
// tracking resource requirements for all containers within each pod.
type PodResourceInfoMap map[types.UID]PodResourceInfo
// Clone returns a copy of PodResourceInfoMap
func (pr PodResourceInfoMap) Clone() PodResourceInfoMap {
prCopy := make(PodResourceInfoMap)
for podUID, podInfo := range pr {
prCopy[podUID] = PodResourceInfo{
ContainerResources: make(map[string]v1.ResourceRequirements),
}
for containerName, containerInfo := range podInfo.ContainerResources {
prCopy[podUID].ContainerResources[containerName] = *containerInfo.DeepCopy()
}
}
return prCopy
}
// Reader interface used to read current pod resource allocation state
// Reader interface used to read current pod resource state
type Reader interface {
GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
GetPodResourceAllocation() PodResourceAllocation
GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
GetPodResourceInfoMap() PodResourceInfoMap
}
type writer interface {
SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error
SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error
SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error
RemovePod(podUID types.UID) error
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
}
// State interface provides methods for tracking and setting pod resource allocation
// State interface provides methods for tracking and setting pod resources
type State interface {
Reader
writer

View File

@ -40,17 +40,17 @@ type stateCheckpoint struct {
lastChecksum checksum.Checksum
}
// NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend
// NewStateCheckpoint creates new State for keeping track of pod resource information with checkpoint backend
func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod resource information tracking: %w", err)
}
pra, checksum, err := restoreState(checkpointManager, checkpointName)
if err != nil {
//lint:ignore ST1005 user-facing error message
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet",
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod resource information checkpoint file %q before restarting Kubelet",
err, path.Join(stateDir, checkpointName))
}
@ -64,7 +64,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
}
// restores state from a checkpoint and creates it if it doesn't exist
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceAllocation, checksum.Checksum, error) {
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceInfoMap, checksum.Checksum, error) {
checkpoint := &Checkpoint{}
if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound {
@ -73,21 +73,21 @@ func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpo
return nil, 0, err
}
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
praInfo, err := checkpoint.GetPodResourceCheckpointInfo()
if err != nil {
return nil, 0, fmt.Errorf("failed to get pod resource allocation info: %w", err)
return nil, 0, fmt.Errorf("failed to get pod resource information: %w", err)
}
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return praInfo.AllocationEntries, checkpoint.Checksum, nil
klog.V(2).InfoS("State checkpoint: restored pod resource state from checkpoint")
return praInfo.Entries, checkpoint.Checksum, nil
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) storeState() error {
podAllocation := sc.cache.GetPodResourceAllocation()
resourceInfo := sc.cache.GetPodResourceInfoMap()
checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{
AllocationEntries: podAllocation,
checkpoint, err := NewCheckpoint(&PodResourceCheckpointInfo{
Entries: resourceInfo,
})
if err != nil {
return fmt.Errorf("failed to create checkpoint: %w", err)
@ -98,47 +98,50 @@ func (sc *stateCheckpoint) storeState() error {
}
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
klog.ErrorS(err, "Failed to save pod allocation checkpoint")
klog.ErrorS(err, "Failed to save pod resource information checkpoint")
return err
}
sc.lastChecksum = checkpoint.Checksum
return nil
}
// GetContainerResourceAllocation returns current resources allocated to a pod's container
func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
// GetContainerResources returns current resources information to a pod's container
func (sc *stateCheckpoint) GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetContainerResourceAllocation(podUID, containerName)
return sc.cache.GetContainerResources(podUID, containerName)
}
// GetPodResourceAllocation returns current pod resource allocation
func (sc *stateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
// GetPodResourceInfoMap returns current pod resource information
func (sc *stateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetPodResourceAllocation()
return sc.cache.GetPodResourceInfoMap()
}
// SetContainerResourceAllocation sets resources allocated to a pod's container
func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error {
// SetContainerResoruces sets resources information for a pod's container
func (sc *stateCheckpoint) SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc)
return sc.storeState()
}
// SetPodResourceAllocation sets pod resource allocation
func (sc *stateCheckpoint) SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error {
sc.mux.Lock()
defer sc.mux.Unlock()
err := sc.cache.SetPodResourceAllocation(podUID, alloc)
err := sc.cache.SetContainerResources(podUID, containerName, resources)
if err != nil {
return err
}
return sc.storeState()
}
// Delete deletes allocations for specified pod
// SetPodResourceInfo sets pod resource information
func (sc *stateCheckpoint) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error {
sc.mux.Lock()
defer sc.mux.Unlock()
err := sc.cache.SetPodResourceInfo(podUID, resourceInfo)
if err != nil {
return err
}
return sc.storeState()
}
// Delete deletes resource information for specified pod
func (sc *stateCheckpoint) RemovePod(podUID types.UID) error {
sc.mux.Lock()
defer sc.mux.Unlock()
@ -161,19 +164,19 @@ func NewNoopStateCheckpoint() State {
return &noopStateCheckpoint{}
}
func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ types.UID, _ string) (v1.ResourceRequirements, bool) {
func (sc *noopStateCheckpoint) GetContainerResources(_ types.UID, _ string) (v1.ResourceRequirements, bool) {
return v1.ResourceRequirements{}, false
}
func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
func (sc *noopStateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap {
return nil
}
func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ types.UID, _ string, _ v1.ResourceRequirements) error {
func (sc *noopStateCheckpoint) SetContainerResources(_ types.UID, _ string, _ v1.ResourceRequirements) error {
return nil
}
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ types.UID, _ map[string]v1.ResourceRequirements) error {
func (sc *noopStateCheckpoint) SetPodResourceInfo(_ types.UID, _ PodResourceInfo) error {
return nil
}

View File

@ -33,7 +33,7 @@ const testCheckpoint = "pod_status_manager_state"
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
testingDir := getTestDir(t)
cache := NewStateMemory(PodResourceAllocation{})
cache := NewStateMemory(PodResourceInfoMap{})
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
require.NoError(t, err, "failed to create checkpoint manager")
checkpointName := "pod_state_checkpoint"
@ -56,12 +56,12 @@ func getTestDir(t *testing.T) string {
return testingDir
}
func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceAllocation, msgAndArgs string) {
for podUID, containerResourceList := range *expected {
require.Equal(t, len(containerResourceList), len((*actual)[podUID]), msgAndArgs)
for containerName, resourceList := range containerResourceList {
func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceInfoMap, msgAndArgs string) {
for podUID, podResourceInfo := range *expected {
require.Equal(t, len(podResourceInfo.ContainerResources), len((*actual)[podUID].ContainerResources), msgAndArgs)
for containerName, resourceList := range podResourceInfo.ContainerResources {
for name, quantity := range resourceList.Requests {
require.True(t, quantity.Equal((*actual)[podUID][containerName].Requests[name]), msgAndArgs)
require.True(t, quantity.Equal((*actual)[podUID].ContainerResources[containerName].Requests[name]), msgAndArgs)
}
}
}
@ -69,7 +69,7 @@ func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceAllo
func Test_stateCheckpoint_storeState(t *testing.T) {
type args struct {
podResourceAllocation PodResourceAllocation
resInfoMap PodResourceInfoMap
}
tests := []struct {
@ -91,12 +91,14 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
}{
name: fmt.Sprintf("resource - %s%s", fact, suf),
args: args{
podResourceAllocation: PodResourceAllocation{
resInfoMap: PodResourceInfoMap{
"pod1": {
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
ContainerResources: map[string]v1.ResourceRequirements{
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
},
},
},
},
@ -111,33 +113,35 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
require.NoError(t, err)
for podUID, alloc := range tt.args.podResourceAllocation {
err = originalSC.SetPodResourceAllocation(podUID, alloc)
for podUID, alloc := range tt.args.resInfoMap {
err = originalSC.SetPodResourceInfo(podUID, alloc)
require.NoError(t, err)
}
actual := originalSC.GetPodResourceAllocation()
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation")
actual := originalSC.GetPodResourceInfoMap()
verifyPodResourceAllocation(t, &tt.args.resInfoMap, &actual, "stored pod resource allocation is not equal to original pod resource allocation")
newSC, err := NewStateCheckpoint(testDir, testCheckpoint)
require.NoError(t, err)
actual = newSC.GetPodResourceAllocation()
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation")
actual = newSC.GetPodResourceInfoMap()
verifyPodResourceAllocation(t, &tt.args.resInfoMap, &actual, "restored pod resource allocation is not equal to original pod resource allocation")
checkpointPath := filepath.Join(testDir, testCheckpoint)
require.FileExists(t, checkpointPath)
require.NoError(t, os.Remove(checkpointPath)) // Remove the checkpoint file to track whether it's re-written.
// Setting the pod allocations to the same values should not re-write the checkpoint.
for podUID, alloc := range tt.args.podResourceAllocation {
require.NoError(t, originalSC.SetPodResourceAllocation(podUID, alloc))
for podUID, alloc := range tt.args.resInfoMap {
require.NoError(t, originalSC.SetPodResourceInfo(podUID, alloc))
require.NoFileExists(t, checkpointPath, "checkpoint should not be re-written")
}
// Setting a new value should update the checkpoint.
require.NoError(t, originalSC.SetPodResourceAllocation("foo-bar", map[string]v1.ResourceRequirements{
"container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}},
require.NoError(t, originalSC.SetPodResourceInfo("foo-bar", PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
"container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}},
},
}))
require.FileExists(t, checkpointPath, "checkpoint should be re-written")
})
@ -153,13 +157,15 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
// prepare old checkpoint, ResizeStatusEntries is unset,
// pretend that the old checkpoint is unaware for the field ResizeStatusEntries
const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}`
expectedPodResourceAllocation := PodResourceAllocation{
const checkpointContent = `{"data":"{\"entries\":{\"pod1\":{\"ContainerResources\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}}","checksum":1178570812}`
expectedPodResourceAllocation := PodResourceInfoMap{
"pod1": {
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1Ki"),
v1.ResourceMemory: resource.MustParse("1Ki"),
ContainerResources: map[string]v1.ResourceRequirements{
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1Ki"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
},
@ -178,7 +184,7 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
sc.cache = NewStateMemory(actualPodResourceAllocation)
actualPodResourceAllocation = sc.cache.GetPodResourceAllocation()
actualPodResourceAllocation = sc.cache.GetPodResourceInfoMap()
require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal")
}

View File

@ -27,63 +27,73 @@ import (
type stateMemory struct {
sync.RWMutex
podAllocation PodResourceAllocation
podResources PodResourceInfoMap
}
var _ State = &stateMemory{}
// NewStateMemory creates new State to track resources allocated to pods
func NewStateMemory(alloc PodResourceAllocation) State {
if alloc == nil {
alloc = PodResourceAllocation{}
// NewStateMemory creates new State to track resources resourcesated to pods
func NewStateMemory(resources PodResourceInfoMap) State {
if resources == nil {
resources = PodResourceInfoMap{}
}
klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking")
klog.V(2).InfoS("Initialized new in-memory state store for pod resource information tracking")
return &stateMemory{
podAllocation: alloc,
podResources: resources,
}
}
func (s *stateMemory) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
func (s *stateMemory) GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
s.RLock()
defer s.RUnlock()
alloc, ok := s.podAllocation[podUID][containerName]
return *alloc.DeepCopy(), ok
resourceInfo, ok := s.podResources[podUID]
if !ok {
return v1.ResourceRequirements{}, ok
}
resources, ok := resourceInfo.ContainerResources[containerName]
if !ok {
return v1.ResourceRequirements{}, ok
}
return *resources.DeepCopy(), ok
}
func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation {
func (s *stateMemory) GetPodResourceInfoMap() PodResourceInfoMap {
s.RLock()
defer s.RUnlock()
return s.podAllocation.Clone()
return s.podResources.Clone()
}
func (s *stateMemory) SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error {
func (s *stateMemory) SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error {
s.Lock()
defer s.Unlock()
if _, ok := s.podAllocation[podUID]; !ok {
s.podAllocation[podUID] = make(map[string]v1.ResourceRequirements)
if _, ok := s.podResources[podUID]; !ok {
s.podResources[podUID] = PodResourceInfo{
ContainerResources: make(map[string]v1.ResourceRequirements),
}
}
s.podAllocation[podUID][containerName] = alloc
klog.V(3).InfoS("Updated container resource allocation", "podUID", podUID, "containerName", containerName, "alloc", alloc)
s.podResources[podUID].ContainerResources[containerName] = resources
klog.V(3).InfoS("Updated container resource information", "podUID", podUID, "containerName", containerName, "resources", resources)
return nil
}
func (s *stateMemory) SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error {
func (s *stateMemory) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error {
s.Lock()
defer s.Unlock()
s.podAllocation[podUID] = alloc
klog.V(3).InfoS("Updated pod resource allocation", "podUID", podUID, "allocation", alloc)
s.podResources[podUID] = resourceInfo
klog.V(3).InfoS("Updated pod resource information", "podUID", podUID, "information", resourceInfo)
return nil
}
func (s *stateMemory) RemovePod(podUID types.UID) error {
s.Lock()
defer s.Unlock()
delete(s.podAllocation, podUID)
klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID)
delete(s.podResources, podUID)
klog.V(3).InfoS("Deleted pod resource information", "podUID", podUID)
return nil
}
@ -91,9 +101,9 @@ func (s *stateMemory) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
s.Lock()
defer s.Unlock()
for podUID := range s.podAllocation {
for podUID := range s.podResources {
if _, ok := remainingPods[types.UID(podUID)]; !ok {
delete(s.podAllocation, podUID)
delete(s.podResources, podUID)
}
}
}

View File

@ -2458,17 +2458,19 @@ func TestPodResourceAllocationReset(t *testing.T) {
emptyPodSpec.Containers[0].Resources.Requests = v1.ResourceList{}
tests := []struct {
name string
pod *v1.Pod
existingPodAllocation *v1.Pod
expectedPodResourceAllocation state.PodResourceAllocation
name string
pod *v1.Pod
existingPodAllocation *v1.Pod
expectedPodResourceInfoMap state.PodResourceInfoMap
}{
{
name: "Having both memory and cpu, resource allocation not exists",
pod: podWithUIDNameNsSpec("1", "pod1", "foo", *cpu500mMem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"1": map[string]v1.ResourceRequirements{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"1": {
ContainerResources: map[string]v1.ResourceRequirements{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources,
},
},
},
},
@ -2476,9 +2478,11 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Having both memory and cpu, resource allocation exists",
pod: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"2": map[string]v1.ResourceRequirements{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"2": {
ContainerResources: map[string]v1.ResourceRequirements{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources,
},
},
},
},
@ -2486,18 +2490,22 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Having both memory and cpu, resource allocation exists (with different value)",
pod: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu500mMem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu800mMem800MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"3": map[string]v1.ResourceRequirements{
cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"3": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources,
},
},
},
},
{
name: "Only has cpu, resource allocation not exists",
pod: podWithUIDNameNsSpec("4", "pod5", "foo", *cpu500mPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"4": map[string]v1.ResourceRequirements{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"4": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources,
},
},
},
},
@ -2505,9 +2513,11 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Only has cpu, resource allocation exists",
pod: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"5": map[string]v1.ResourceRequirements{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"5": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources,
},
},
},
},
@ -2515,18 +2525,22 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Only has cpu, resource allocation exists (with different value)",
pod: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu500mPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu800mPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"6": map[string]v1.ResourceRequirements{
cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"6": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources,
},
},
},
},
{
name: "Only has memory, resource allocation not exists",
pod: podWithUIDNameNsSpec("7", "pod7", "foo", *mem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"7": map[string]v1.ResourceRequirements{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"7": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources,
},
},
},
},
@ -2534,9 +2548,11 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Only has memory, resource allocation exists",
pod: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"8": map[string]v1.ResourceRequirements{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"8": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources,
},
},
},
},
@ -2544,18 +2560,22 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Only has memory, resource allocation exists (with different value)",
pod: podWithUIDNameNsSpec("9", "pod9", "foo", *mem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("9", "pod9", "foo", *mem800MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"9": map[string]v1.ResourceRequirements{
mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"9": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources,
},
},
},
},
{
name: "No CPU and memory, resource allocation not exists",
pod: podWithUIDNameNsSpec("10", "pod10", "foo", *emptyPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"10": map[string]v1.ResourceRequirements{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"10": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources,
},
},
},
},
@ -2563,9 +2583,11 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "No CPU and memory, resource allocation exists",
pod: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"11": map[string]v1.ResourceRequirements{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources,
expectedPodResourceInfoMap: state.PodResourceInfoMap{
"11": state.PodResourceInfo{
ContainerResources: map[string]v1.ResourceRequirements{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources,
},
},
},
},
@ -2585,7 +2607,7 @@ func TestPodResourceAllocationReset(t *testing.T) {
if !found {
t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
}
assert.Equal(t, tc.expectedPodResourceAllocation[tc.pod.UID][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name)
assert.Equal(t, tc.expectedPodResourceInfoMap[tc.pod.UID].ContainerResources[tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name)
})
}
}