Don't re-write the checkpoint file when the content is unchanged

This commit is contained in:
Tim Allclair 2025-03-05 15:19:56 -08:00
parent c79d3ce434
commit 30bca329c5
2 changed files with 46 additions and 28 deletions

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
) )
@ -36,6 +37,7 @@ type stateCheckpoint struct {
cache State cache State
checkpointManager checkpointmanager.CheckpointManager checkpointManager checkpointmanager.CheckpointManager
checkpointName string checkpointName string
lastChecksum checksum.Checksum
} }
// NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend // NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend
@ -45,7 +47,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
} }
praInfo, err := restoreState(checkpointManager, checkpointName) pra, checksum, err := restoreState(checkpointManager, checkpointName)
if err != nil { if err != nil {
//lint:ignore ST1005 user-facing error message //lint:ignore ST1005 user-facing error message
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet",
@ -53,33 +55,31 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
} }
stateCheckpoint := &stateCheckpoint{ stateCheckpoint := &stateCheckpoint{
cache: NewStateMemory(praInfo.AllocationEntries), cache: NewStateMemory(pra),
checkpointManager: checkpointManager, checkpointManager: checkpointManager,
checkpointName: checkpointName, checkpointName: checkpointName,
lastChecksum: checksum,
} }
return stateCheckpoint, nil return stateCheckpoint, nil
} }
// restores state from a checkpoint and creates it if it doesn't exist // restores state from a checkpoint and creates it if it doesn't exist
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) { func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceAllocation, checksum.Checksum, error) {
var err error
checkpoint := &Checkpoint{} checkpoint := &Checkpoint{}
if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
return &PodResourceAllocationInfo{ return nil, 0, nil
AllocationEntries: make(map[types.UID]map[string]v1.ResourceRequirements),
}, nil
} }
return nil, err return nil, 0, err
} }
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
praInfo, err := checkpoint.GetPodResourceAllocationInfo() praInfo, err := checkpoint.GetPodResourceAllocationInfo()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err) return nil, 0, fmt.Errorf("failed to get pod resource allocation info: %w", err)
} }
return praInfo, nil klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return praInfo.AllocationEntries, checkpoint.Checksum, nil
} }
// saves state to a checkpoint, caller is responsible for locking // saves state to a checkpoint, caller is responsible for locking
@ -92,11 +92,16 @@ func (sc *stateCheckpoint) storeState() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to create checkpoint: %w", err) return fmt.Errorf("failed to create checkpoint: %w", err)
} }
if checkpoint.Checksum == sc.lastChecksum {
// No changes to the checkpoint => no need to re-write it.
return nil
}
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to save pod allocation checkpoint") klog.ErrorS(err, "Failed to save pod allocation checkpoint")
return err return err
} }
sc.lastChecksum = checkpoint.Checksum
return nil return nil
} }

View File

@ -19,13 +19,13 @@ package state
import ( import (
"fmt" "fmt"
"os" "os"
"path/filepath"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
) )
@ -124,6 +124,22 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
actual = newSC.GetPodResourceAllocation() actual = newSC.GetPodResourceAllocation()
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation") verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation")
checkpointPath := filepath.Join(testDir, testCheckpoint)
require.FileExists(t, checkpointPath)
require.NoError(t, os.Remove(checkpointPath)) // Remove the checkpoint file to track whether it's re-written.
// Setting the pod allocations to the same values should not re-write the checkpoint.
for podUID, alloc := range tt.args.podResourceAllocation {
require.NoError(t, originalSC.SetPodResourceAllocation(podUID, alloc))
require.NoFileExists(t, checkpointPath, "checkpoint should not be re-written")
}
// Setting a new value should update the checkpoint.
require.NoError(t, originalSC.SetPodResourceAllocation("foo-bar", map[string]v1.ResourceRequirements{
"container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}},
}))
require.FileExists(t, checkpointPath, "checkpoint should be re-written")
}) })
} }
} }
@ -138,8 +154,7 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
// prepare old checkpoint, ResizeStatusEntries is unset, // prepare old checkpoint, ResizeStatusEntries is unset,
// pretend that the old checkpoint is unaware for the field ResizeStatusEntries // pretend that the old checkpoint is unaware for the field ResizeStatusEntries
const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}` const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}`
expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{ expectedPodResourceAllocation := PodResourceAllocation{
AllocationEntries: map[types.UID]map[string]v1.ResourceRequirements{
"pod1": { "pod1": {
"container1": { "container1": {
Requests: v1.ResourceList{ Requests: v1.ResourceList{
@ -148,7 +163,6 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
}, },
}, },
}, },
},
} }
checkpoint := &Checkpoint{} checkpoint := &Checkpoint{}
err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent)) err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent))
@ -157,15 +171,14 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
require.NoError(t, err, "failed to create old checkpoint") require.NoError(t, err, "failed to create old checkpoint")
actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName) actualPodResourceAllocation, _, err := restoreState(sc.checkpointManager, sc.checkpointName)
require.NoError(t, err, "failed to restore state") require.NoError(t, err, "failed to restore state")
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal")
sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries) sc.cache = NewStateMemory(actualPodResourceAllocation)
actualPodResourceAllocationInfo = &PodResourceAllocationInfo{} actualPodResourceAllocation = sc.cache.GetPodResourceAllocation()
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal")
} }