diff --git a/pkg/kubelet/allocation/state/state_checkpoint.go b/pkg/kubelet/allocation/state/state_checkpoint.go index cfb7244ea83..c95dd93ee37 100644 --- a/pkg/kubelet/allocation/state/state_checkpoint.go +++ b/pkg/kubelet/allocation/state/state_checkpoint.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" ) @@ -36,6 +37,7 @@ type stateCheckpoint struct { cache State checkpointManager checkpointmanager.CheckpointManager checkpointName string + lastChecksum checksum.Checksum } // NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend @@ -45,7 +47,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) } - praInfo, err := restoreState(checkpointManager, checkpointName) + pra, checksum, err := restoreState(checkpointManager, checkpointName) if err != nil { //lint:ignore ST1005 user-facing error message return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", @@ -53,33 +55,31 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) { } stateCheckpoint := &stateCheckpoint{ - cache: NewStateMemory(praInfo.AllocationEntries), + cache: NewStateMemory(pra), checkpointManager: checkpointManager, checkpointName: checkpointName, + lastChecksum: checksum, } return stateCheckpoint, nil } // restores state from a checkpoint and creates it if it doesn't exist -func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) { - var err error +func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceAllocation, checksum.Checksum, error) { checkpoint := &Checkpoint{} - - if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { + if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { if err == errors.ErrCheckpointNotFound { - return &PodResourceAllocationInfo{ - AllocationEntries: make(map[types.UID]map[string]v1.ResourceRequirements), - }, nil + return nil, 0, nil } - return nil, err + return nil, 0, err } - klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") + praInfo, err := checkpoint.GetPodResourceAllocationInfo() if err != nil { - return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err) + return nil, 0, fmt.Errorf("failed to get pod resource allocation info: %w", err) } - return praInfo, nil + klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") + return praInfo.AllocationEntries, checkpoint.Checksum, nil } // saves state to a checkpoint, caller is responsible for locking @@ -92,11 +92,16 @@ func (sc *stateCheckpoint) storeState() error { if err != nil { return fmt.Errorf("failed to create checkpoint: %w", err) } + if checkpoint.Checksum == sc.lastChecksum { + // No changes to the checkpoint => no need to re-write it. + return nil + } err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) if err != nil { klog.ErrorS(err, "Failed to save pod allocation checkpoint") return err } + sc.lastChecksum = checkpoint.Checksum return nil } diff --git a/pkg/kubelet/allocation/state/state_checkpoint_test.go b/pkg/kubelet/allocation/state/state_checkpoint_test.go index 749875e6482..e8c067faf9c 100644 --- a/pkg/kubelet/allocation/state/state_checkpoint_test.go +++ b/pkg/kubelet/allocation/state/state_checkpoint_test.go @@ -19,13 +19,13 @@ package state import ( "fmt" "os" + "path/filepath" "testing" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" ) @@ -124,6 +124,22 @@ func Test_stateCheckpoint_storeState(t *testing.T) { actual = newSC.GetPodResourceAllocation() verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation") + + checkpointPath := filepath.Join(testDir, testCheckpoint) + require.FileExists(t, checkpointPath) + require.NoError(t, os.Remove(checkpointPath)) // Remove the checkpoint file to track whether it's re-written. + + // Setting the pod allocations to the same values should not re-write the checkpoint. + for podUID, alloc := range tt.args.podResourceAllocation { + require.NoError(t, originalSC.SetPodResourceAllocation(podUID, alloc)) + require.NoFileExists(t, checkpointPath, "checkpoint should not be re-written") + } + + // Setting a new value should update the checkpoint. + require.NoError(t, originalSC.SetPodResourceAllocation("foo-bar", map[string]v1.ResourceRequirements{ + "container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}}, + })) + require.FileExists(t, checkpointPath, "checkpoint should be re-written") }) } } @@ -138,14 +154,12 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { // prepare old checkpoint, ResizeStatusEntries is unset, // pretend that the old checkpoint is unaware for the field ResizeStatusEntries const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}` - expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{ - AllocationEntries: map[types.UID]map[string]v1.ResourceRequirements{ - "pod1": { - "container1": { - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1Ki"), - v1.ResourceMemory: resource.MustParse("1Ki"), - }, + expectedPodResourceAllocation := PodResourceAllocation{ + "pod1": { + "container1": { + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1Ki"), + v1.ResourceMemory: resource.MustParse("1Ki"), }, }, }, @@ -157,15 +171,14 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) require.NoError(t, err, "failed to create old checkpoint") - actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName) + actualPodResourceAllocation, _, err := restoreState(sc.checkpointManager, sc.checkpointName) require.NoError(t, err, "failed to restore state") - require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") + require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal") - sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries) + sc.cache = NewStateMemory(actualPodResourceAllocation) - actualPodResourceAllocationInfo = &PodResourceAllocationInfo{} - actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation() + actualPodResourceAllocation = sc.cache.GetPodResourceAllocation() - require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") + require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal") }