diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 096d39b88c1..b8bbfece860 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2922,7 +2922,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine if fit { // Update pod resource allocation checkpoint if err := kl.statusManager.SetPodAllocation(pod); err != nil { - klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod)) + return nil, err } for i, container := range pod.Spec.Containers { if !apiequality.Semantic.DeepEqual(container.Resources, allocatedPod.Spec.Containers[i].Resources) { diff --git a/pkg/kubelet/status/fake_status_manager.go b/pkg/kubelet/status/fake_status_manager.go index 9590b230be9..897bc34c5f4 100644 --- a/pkg/kubelet/status/fake_status_manager.go +++ b/pkg/kubelet/status/fake_status_manager.go @@ -93,6 +93,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe // NewFakeManager creates empty/fake memory manager func NewFakeManager() Manager { return &fakeManager{ - state: state.NewStateMemory(state.PodResourceAllocation{}, state.PodResizeStatus{}), + state: state.NewStateMemory(state.PodResourceAllocation{}), } } diff --git a/pkg/kubelet/status/state/state.go b/pkg/kubelet/status/state/state.go index d980c96dea7..9dc669746e9 100644 --- a/pkg/kubelet/status/state/state.go +++ b/pkg/kubelet/status/state/state.go @@ -47,7 +47,6 @@ type Reader interface { type writer interface { SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error - SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) Delete(podUID string, containerName string) error ClearState() error diff --git a/pkg/kubelet/status/state/state_checkpoint.go b/pkg/kubelet/status/state/state_checkpoint.go index faa53db0525..d20d15ee07b 100644 --- a/pkg/kubelet/status/state/state_checkpoint.go +++ b/pkg/kubelet/status/state/state_checkpoint.go @@ -42,50 +42,43 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) } + + praInfo, err := restoreState(checkpointManager, checkpointName) + if err != nil { + //lint:ignore ST1005 user-facing error message + return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", + err, path.Join(stateDir, checkpointName)) + } + stateCheckpoint := &stateCheckpoint{ - cache: NewStateMemory(PodResourceAllocation{}, PodResizeStatus{}), + cache: NewStateMemory(praInfo.AllocationEntries), checkpointManager: checkpointManager, checkpointName: checkpointName, } - if err := stateCheckpoint.restoreState(); err != nil { - //lint:ignore ST1005 user-facing error message - return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName)) - } + klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") return stateCheckpoint, nil } // restores state from a checkpoint and creates it if it doesn't exist -func (sc *stateCheckpoint) restoreState() error { - sc.mux.Lock() - defer sc.mux.Unlock() +func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) { var err error + checkpoint := &Checkpoint{} - checkpoint, err := NewCheckpoint(nil) - if err != nil { - return fmt.Errorf("failed to create new checkpoint: %w", err) - } - - if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { + if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { if err == errors.ErrCheckpointNotFound { - return sc.storeState() + return &PodResourceAllocationInfo{ + AllocationEntries: make(map[string]map[string]v1.ResourceRequirements), + }, nil } - return err + return nil, err } praInfo, err := checkpoint.GetPodResourceAllocationInfo() if err != nil { - return fmt.Errorf("failed to get pod resource allocation info: %w", err) + return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err) } - for podUID, alloc := range praInfo.AllocationEntries { - err = sc.cache.SetPodResourceAllocation(podUID, alloc) - if err != nil { - klog.ErrorS(err, "failed to set pod resource allocation") - } - } - - klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") - return nil + return praInfo, nil } // saves state to a checkpoint, caller is responsible for locking @@ -135,19 +128,6 @@ func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, contain return sc.storeState() } -// SetPodResourceAllocation sets pod resource allocation -func (sc *stateCheckpoint) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error { - sc.mux.Lock() - defer sc.mux.Unlock() - - err := sc.cache.SetPodResourceAllocation(podUID, alloc) - if err != nil { - return err - } - - return sc.storeState() -} - // SetPodResizeStatus sets the last resize decision for a pod func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { sc.mux.Lock() @@ -194,10 +174,6 @@ func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string return nil } -func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v1.ResourceRequirements) error { - return nil -} - func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {} func (sc *noopStateCheckpoint) Delete(_ string, _ string) error { diff --git a/pkg/kubelet/status/state/state_checkpoint_test.go b/pkg/kubelet/status/state/state_checkpoint_test.go index f5467542afa..9a927b60372 100644 --- a/pkg/kubelet/status/state/state_checkpoint_test.go +++ b/pkg/kubelet/status/state/state_checkpoint_test.go @@ -32,7 +32,7 @@ const testCheckpoint = "pod_status_manager_state" func newTestStateCheckpoint(t *testing.T) *stateCheckpoint { testingDir := getTestDir(t) - cache := NewStateMemory(PodResourceAllocation{}, PodResizeStatus{}) + cache := NewStateMemory(PodResourceAllocation{}) checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir) require.NoError(t, err, "failed to create checkpoint manager") checkpointName := "pod_state_checkpoint" @@ -110,9 +110,11 @@ func Test_stateCheckpoint_storeState(t *testing.T) { originalSC, err := NewStateCheckpoint(testDir, testCheckpoint) require.NoError(t, err) - for podUID, alloc := range tt.args.podResourceAllocation { - err = originalSC.SetPodResourceAllocation(podUID, alloc) - require.NoError(t, err) + for podUID, containerAlloc := range tt.args.podResourceAllocation { + for containerName, alloc := range containerAlloc { + err = originalSC.SetContainerResourceAllocation(podUID, containerName, alloc) + require.NoError(t, err) + } } actual := originalSC.GetPodResourceAllocation() @@ -156,11 +158,15 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) require.NoError(t, err, "failed to create old checkpoint") - err = sc.restoreState() + actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName) require.NoError(t, err, "failed to restore state") - actualPodResourceAllocationInfo := &PodResourceAllocationInfo{} + require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") + + sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries) + + actualPodResourceAllocationInfo = &PodResourceAllocationInfo{} actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation() - require.NoError(t, err, "failed to get pod resource allocation info") + require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") } diff --git a/pkg/kubelet/status/state/state_mem.go b/pkg/kubelet/status/state/state_mem.go index 8730f0ea1d3..fca30e3f046 100644 --- a/pkg/kubelet/status/state/state_mem.go +++ b/pkg/kubelet/status/state/state_mem.go @@ -32,11 +32,14 @@ type stateMemory struct { var _ State = &stateMemory{} // NewStateMemory creates new State to track resources allocated to pods -func NewStateMemory(alloc PodResourceAllocation, stats PodResizeStatus) State { +func NewStateMemory(alloc PodResourceAllocation) State { + if alloc == nil { + alloc = PodResourceAllocation{} + } klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking") return &stateMemory{ podAllocation: alloc, - podResizeStatus: stats, + podResizeStatus: PodResizeStatus{}, } } @@ -74,18 +77,6 @@ func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerNam return nil } -func (s *stateMemory) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error { - s.Lock() - defer s.Unlock() - - for containerName, containerAlloc := range alloc { - s.podAllocation[podUID][containerName] = containerAlloc - } - - klog.V(3).InfoS("Updated pod resource allocation", "podUID", podUID, "allocation", alloc) - return nil -} - func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { s.Lock() defer s.Unlock() diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 7937d27bb53..3da131d5392 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -296,15 +296,13 @@ func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus { func (m *manager) SetPodAllocation(pod *v1.Pod) error { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() - - podAlloc := make(map[string]v1.ResourceRequirements) - for _, container := range pod.Spec.Containers { alloc := *container.Resources.DeepCopy() - podAlloc[container.Name] = alloc + if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil { + return err + } } - - return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc) + return nil } // SetPodResizeStatus checkpoints the last resizing decision for the pod.