diff --git a/pkg/kubelet/status/fake_status_manager.go b/pkg/kubelet/status/fake_status_manager.go index 6d2031419df..897bc34c5f4 100644 --- a/pkg/kubelet/status/fake_status_manager.go +++ b/pkg/kubelet/status/fake_status_manager.go @@ -93,6 +93,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe // NewFakeManager creates empty/fake memory manager func NewFakeManager() Manager { return &fakeManager{ - state: state.NewStateMemory(), + state: state.NewStateMemory(state.PodResourceAllocation{}), } } diff --git a/pkg/kubelet/status/state/state.go b/pkg/kubelet/status/state/state.go index 10a66be46c3..9dc669746e9 100644 --- a/pkg/kubelet/status/state/state.go +++ b/pkg/kubelet/status/state/state.go @@ -47,7 +47,6 @@ type Reader interface { type writer interface { SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error - SetPodResourceAllocation(PodResourceAllocation) error SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) Delete(podUID string, containerName string) error ClearState() error diff --git a/pkg/kubelet/status/state/state_checkpoint.go b/pkg/kubelet/status/state/state_checkpoint.go index 2e1ca729100..56c4bb786ca 100644 --- a/pkg/kubelet/status/state/state_checkpoint.go +++ b/pkg/kubelet/status/state/state_checkpoint.go @@ -42,46 +42,42 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) } - stateCheckpoint := &stateCheckpoint{ - cache: NewStateMemory(), - checkpointManager: checkpointManager, - checkpointName: checkpointName, + + praInfo, err := restoreState(checkpointManager, checkpointName) + if err != nil { + //lint:ignore ST1005 user-facing error message + return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", + err, path.Join(stateDir, checkpointName)) } - if err := stateCheckpoint.restoreState(); err != nil { - //lint:ignore ST1005 user-facing error message - return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName)) + stateCheckpoint := &stateCheckpoint{ + cache: NewStateMemory(praInfo.AllocationEntries), + checkpointManager: checkpointManager, + checkpointName: checkpointName, } return stateCheckpoint, nil } // restores state from a checkpoint and creates it if it doesn't exist -func (sc *stateCheckpoint) restoreState() error { - sc.mux.Lock() - defer sc.mux.Unlock() +func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) { var err error + checkpoint := &Checkpoint{} - checkpoint, err := NewCheckpoint(nil) - if err != nil { - return fmt.Errorf("failed to create new checkpoint: %w", err) - } - - if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { + if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { if err == errors.ErrCheckpointNotFound { - return sc.storeState() + return &PodResourceAllocationInfo{ + AllocationEntries: make(map[string]map[string]v1.ResourceRequirements), + }, nil } - return err - } - praInfo, err := checkpoint.GetPodResourceAllocationInfo() - if err != nil { - return fmt.Errorf("failed to get pod resource allocation info: %w", err) - } - err = sc.cache.SetPodResourceAllocation(praInfo.AllocationEntries) - if err != nil { - return fmt.Errorf("failed to set pod resource allocation: %w", err) + return nil, err } klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") - return nil + praInfo, err := checkpoint.GetPodResourceAllocationInfo() + if err != nil { + return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err) + } + + return praInfo, nil } // saves state to a checkpoint, caller is responsible for locking @@ -131,14 +127,6 @@ func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, contain return sc.storeState() } -// SetPodResourceAllocation sets pod resource allocation -func (sc *stateCheckpoint) SetPodResourceAllocation(a PodResourceAllocation) error { - sc.mux.Lock() - defer sc.mux.Unlock() - sc.cache.SetPodResourceAllocation(a) - return sc.storeState() -} - // SetPodResizeStatus sets the last resize decision for a pod func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { sc.mux.Lock() @@ -185,10 +173,6 @@ func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string return nil } -func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ PodResourceAllocation) error { - return nil -} - func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {} func (sc *noopStateCheckpoint) Delete(_ string, _ string) error { diff --git a/pkg/kubelet/status/state/state_checkpoint_test.go b/pkg/kubelet/status/state/state_checkpoint_test.go index 03be8c1c46d..9a927b60372 100644 --- a/pkg/kubelet/status/state/state_checkpoint_test.go +++ b/pkg/kubelet/status/state/state_checkpoint_test.go @@ -32,7 +32,7 @@ const testCheckpoint = "pod_status_manager_state" func newTestStateCheckpoint(t *testing.T) *stateCheckpoint { testingDir := getTestDir(t) - cache := NewStateMemory() + cache := NewStateMemory(PodResourceAllocation{}) checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir) require.NoError(t, err, "failed to create checkpoint manager") checkpointName := "pod_state_checkpoint" @@ -110,8 +110,12 @@ func Test_stateCheckpoint_storeState(t *testing.T) { originalSC, err := NewStateCheckpoint(testDir, testCheckpoint) require.NoError(t, err) - err = originalSC.SetPodResourceAllocation(tt.args.podResourceAllocation) - require.NoError(t, err) + for podUID, containerAlloc := range tt.args.podResourceAllocation { + for containerName, alloc := range containerAlloc { + err = originalSC.SetContainerResourceAllocation(podUID, containerName, alloc) + require.NoError(t, err) + } + } actual := originalSC.GetPodResourceAllocation() verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation") @@ -154,11 +158,15 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) require.NoError(t, err, "failed to create old checkpoint") - err = sc.restoreState() + actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName) require.NoError(t, err, "failed to restore state") - actualPodResourceAllocationInfo := &PodResourceAllocationInfo{} + require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") + + sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries) + + actualPodResourceAllocationInfo = &PodResourceAllocationInfo{} actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation() - require.NoError(t, err, "failed to get pod resource allocation info") + require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") } diff --git a/pkg/kubelet/status/state/state_mem.go b/pkg/kubelet/status/state/state_mem.go index c8107dacbe2..fca30e3f046 100644 --- a/pkg/kubelet/status/state/state_mem.go +++ b/pkg/kubelet/status/state/state_mem.go @@ -32,10 +32,13 @@ type stateMemory struct { var _ State = &stateMemory{} // NewStateMemory creates new State to track resources allocated to pods -func NewStateMemory() State { +func NewStateMemory(alloc PodResourceAllocation) State { + if alloc == nil { + alloc = PodResourceAllocation{} + } klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking") return &stateMemory{ - podAllocation: PodResourceAllocation{}, + podAllocation: alloc, podResizeStatus: PodResizeStatus{}, } } @@ -74,15 +77,6 @@ func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerNam return nil } -func (s *stateMemory) SetPodResourceAllocation(a PodResourceAllocation) error { - s.Lock() - defer s.Unlock() - - s.podAllocation = a.Clone() - klog.V(3).InfoS("Updated pod resource allocation", "allocation", a) - return nil -} - func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { s.Lock() defer s.Unlock()