diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index 576612c52a1..dac8f2f1fd0 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -176,12 +176,12 @@ func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements { } func (m *manager) RemovePod(uid types.UID) { - if err := m.allocated.Delete(uid, ""); err != nil { + if err := m.allocated.RemovePod(uid); err != nil { // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) } - if err := m.actuated.Delete(uid, ""); err != nil { + if err := m.actuated.RemovePod(uid); err != nil { // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) } diff --git a/pkg/kubelet/allocation/state/state.go b/pkg/kubelet/allocation/state/state.go index fff569ce3d1..21d240294ab 100644 --- a/pkg/kubelet/allocation/state/state.go +++ b/pkg/kubelet/allocation/state/state.go @@ -46,7 +46,7 @@ type Reader interface { type writer interface { SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error - Delete(podUID types.UID, containerName string) error + RemovePod(podUID types.UID) error // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. RemoveOrphanedPods(remainingPods sets.Set[types.UID]) } diff --git a/pkg/kubelet/allocation/state/state_checkpoint.go b/pkg/kubelet/allocation/state/state_checkpoint.go index cfb7244ea83..df07cae051f 100644 --- a/pkg/kubelet/allocation/state/state_checkpoint.go +++ b/pkg/kubelet/allocation/state/state_checkpoint.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" ) @@ -36,6 +37,7 @@ type stateCheckpoint struct { cache State checkpointManager checkpointmanager.CheckpointManager checkpointName string + lastChecksum checksum.Checksum } // NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend @@ -45,7 +47,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) } - praInfo, err := restoreState(checkpointManager, checkpointName) + pra, checksum, err := restoreState(checkpointManager, checkpointName) if err != nil { //lint:ignore ST1005 user-facing error message return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", @@ -53,33 +55,31 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { } stateCheckpoint := &stateCheckpoint{ - cache: NewStateMemory(praInfo.AllocationEntries), + cache: NewStateMemory(pra), checkpointManager: checkpointManager, checkpointName: checkpointName, + lastChecksum: checksum, } return stateCheckpoint, nil } // restores state from a checkpoint and creates it if it doesn't exist -func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) { - var err error +func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceAllocation, checksum.Checksum, error) { checkpoint := &Checkpoint{} - - if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { + if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { if err == errors.ErrCheckpointNotFound { - return &PodResourceAllocationInfo{ - AllocationEntries: make(map[types.UID]map[string]v1.ResourceRequirements), - }, nil + return nil, 0, nil } - return nil, err + return nil, 0, err } - klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") + praInfo, err := checkpoint.GetPodResourceAllocationInfo() if err != nil { - return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err) + return nil, 0, fmt.Errorf("failed to get pod resource allocation info: %w", err) } - return praInfo, nil + klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") + return praInfo.AllocationEntries, checkpoint.Checksum, nil } // saves state to a checkpoint, caller is responsible for locking @@ -92,11 +92,16 @@ func (sc *stateCheckpoint) storeState() error { if err != nil { return fmt.Errorf("failed to create checkpoint: %w", err) } + if checkpoint.Checksum == sc.lastChecksum { + // No changes to the checkpoint => no need to re-write it. + return nil + } err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) if err != nil { klog.ErrorS(err, "Failed to save pod allocation checkpoint") return err } + sc.lastChecksum = checkpoint.Checksum return nil } @@ -134,11 +139,13 @@ func (sc *stateCheckpoint) SetPodResourceAllocation(podUID types.UID, alloc map[ } // Delete deletes allocations for specified pod -func (sc *stateCheckpoint) Delete(podUID types.UID, containerName string) error { +func (sc *stateCheckpoint) RemovePod(podUID types.UID) error { sc.mux.Lock() defer sc.mux.Unlock() - sc.cache.Delete(podUID, containerName) - return sc.storeState() + // Skip writing the checkpoint for pod deletion, since there is no side effect to + // keeping a deleted pod. Deleted pods will eventually be cleaned up by RemoveOrphanedPods. + // The deletion will be stored the next time a non-delete update is made. + return sc.cache.RemovePod(podUID) } func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { @@ -170,7 +177,7 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ types.UID, _ map[strin return nil } -func (sc *noopStateCheckpoint) Delete(_ types.UID, _ string) error { +func (sc *noopStateCheckpoint) RemovePod(_ types.UID) error { return nil } diff --git a/pkg/kubelet/allocation/state/state_checkpoint_test.go b/pkg/kubelet/allocation/state/state_checkpoint_test.go index 749875e6482..e8c067faf9c 100644 --- a/pkg/kubelet/allocation/state/state_checkpoint_test.go +++ b/pkg/kubelet/allocation/state/state_checkpoint_test.go @@ -19,13 +19,13 @@ package state import ( "fmt" "os" + "path/filepath" "testing" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" ) @@ -124,6 +124,22 @@ func Test_stateCheckpoint_storeState(t *testing.T) { actual = newSC.GetPodResourceAllocation() verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation") + + checkpointPath := filepath.Join(testDir, testCheckpoint) + require.FileExists(t, checkpointPath) + require.NoError(t, os.Remove(checkpointPath)) // Remove the checkpoint file to track whether it's re-written. + + // Setting the pod allocations to the same values should not re-write the checkpoint. + for podUID, alloc := range tt.args.podResourceAllocation { + require.NoError(t, originalSC.SetPodResourceAllocation(podUID, alloc)) + require.NoFileExists(t, checkpointPath, "checkpoint should not be re-written") + } + + // Setting a new value should update the checkpoint. + require.NoError(t, originalSC.SetPodResourceAllocation("foo-bar", map[string]v1.ResourceRequirements{ + "container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}}, + })) + require.FileExists(t, checkpointPath, "checkpoint should be re-written") }) } } @@ -138,14 +154,12 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { // prepare old checkpoint, ResizeStatusEntries is unset, // pretend that the old checkpoint is unaware for the field ResizeStatusEntries const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}` - expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{ - AllocationEntries: map[types.UID]map[string]v1.ResourceRequirements{ - "pod1": { - "container1": { - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1Ki"), - v1.ResourceMemory: resource.MustParse("1Ki"), - }, + expectedPodResourceAllocation := PodResourceAllocation{ + "pod1": { + "container1": { + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1Ki"), + v1.ResourceMemory: resource.MustParse("1Ki"), }, }, }, @@ -157,15 +171,14 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) require.NoError(t, err, "failed to create old checkpoint") - actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName) + actualPodResourceAllocation, _, err := restoreState(sc.checkpointManager, sc.checkpointName) require.NoError(t, err, "failed to restore state") - require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") + require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal") - sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries) + sc.cache = NewStateMemory(actualPodResourceAllocation) - actualPodResourceAllocationInfo = &PodResourceAllocationInfo{} - actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation() + actualPodResourceAllocation = sc.cache.GetPodResourceAllocation() - require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") + require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal") } diff --git a/pkg/kubelet/allocation/state/state_mem.go b/pkg/kubelet/allocation/state/state_mem.go index 7c46cd36183..5ad0e57f53e 100644 --- a/pkg/kubelet/allocation/state/state_mem.go +++ b/pkg/kubelet/allocation/state/state_mem.go @@ -79,23 +79,11 @@ func (s *stateMemory) SetPodResourceAllocation(podUID types.UID, alloc map[strin return nil } -func (s *stateMemory) deleteContainer(podUID types.UID, containerName string) { - delete(s.podAllocation[podUID], containerName) - if len(s.podAllocation[podUID]) == 0 { - delete(s.podAllocation, podUID) - } - klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID, "containerName", containerName) -} - -func (s *stateMemory) Delete(podUID types.UID, containerName string) error { +func (s *stateMemory) RemovePod(podUID types.UID) error { s.Lock() defer s.Unlock() - if len(containerName) == 0 { - delete(s.podAllocation, podUID) - klog.V(3).InfoS("Deleted pod resource allocation and resize state", "podUID", podUID) - return nil - } - s.deleteContainer(podUID, containerName) + delete(s.podAllocation, podUID) + klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID) return nil }