Merge pull request #130602 from tallclair/allocation-checkpoint-optimization

Allocation checkpoint cleanup
This commit is contained in:
Kubernetes Prow Robot 2025-03-12 22:41:46 -07:00 committed by GitHub
commit 7f818e9f02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 58 additions and 50 deletions

View File

@ -176,12 +176,12 @@ func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements {
} }
func (m *manager) RemovePod(uid types.UID) { func (m *manager) RemovePod(uid types.UID) {
if err := m.allocated.Delete(uid, ""); err != nil { if err := m.allocated.RemovePod(uid); err != nil {
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
} }
if err := m.actuated.Delete(uid, ""); err != nil { if err := m.actuated.RemovePod(uid); err != nil {
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error. // If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid) klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
} }

View File

@ -46,7 +46,7 @@ type Reader interface {
type writer interface { type writer interface {
SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error
Delete(podUID types.UID, containerName string) error RemovePod(podUID types.UID) error
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID]) RemoveOrphanedPods(remainingPods sets.Set[types.UID])
} }

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
) )
@ -36,6 +37,7 @@ type stateCheckpoint struct {
cache State cache State
checkpointManager checkpointmanager.CheckpointManager checkpointManager checkpointmanager.CheckpointManager
checkpointName string checkpointName string
lastChecksum checksum.Checksum
} }
// NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend // NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend
@ -45,7 +47,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
} }
praInfo, err := restoreState(checkpointManager, checkpointName) pra, checksum, err := restoreState(checkpointManager, checkpointName)
if err != nil { if err != nil {
//lint:ignore ST1005 user-facing error message //lint:ignore ST1005 user-facing error message
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet",
@ -53,33 +55,31 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
} }
stateCheckpoint := &stateCheckpoint{ stateCheckpoint := &stateCheckpoint{
cache: NewStateMemory(praInfo.AllocationEntries), cache: NewStateMemory(pra),
checkpointManager: checkpointManager, checkpointManager: checkpointManager,
checkpointName: checkpointName, checkpointName: checkpointName,
lastChecksum: checksum,
} }
return stateCheckpoint, nil return stateCheckpoint, nil
} }
// restores state from a checkpoint and creates it if it doesn't exist // restores state from a checkpoint and creates it if it doesn't exist
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) { func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceAllocation, checksum.Checksum, error) {
var err error
checkpoint := &Checkpoint{} checkpoint := &Checkpoint{}
if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
return &PodResourceAllocationInfo{ return nil, 0, nil
AllocationEntries: make(map[types.UID]map[string]v1.ResourceRequirements),
}, nil
} }
return nil, err return nil, 0, err
} }
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
praInfo, err := checkpoint.GetPodResourceAllocationInfo() praInfo, err := checkpoint.GetPodResourceAllocationInfo()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err) return nil, 0, fmt.Errorf("failed to get pod resource allocation info: %w", err)
} }
return praInfo, nil klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return praInfo.AllocationEntries, checkpoint.Checksum, nil
} }
// saves state to a checkpoint, caller is responsible for locking // saves state to a checkpoint, caller is responsible for locking
@ -92,11 +92,16 @@ func (sc *stateCheckpoint) storeState() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to create checkpoint: %w", err) return fmt.Errorf("failed to create checkpoint: %w", err)
} }
if checkpoint.Checksum == sc.lastChecksum {
// No changes to the checkpoint => no need to re-write it.
return nil
}
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to save pod allocation checkpoint") klog.ErrorS(err, "Failed to save pod allocation checkpoint")
return err return err
} }
sc.lastChecksum = checkpoint.Checksum
return nil return nil
} }
@ -134,11 +139,13 @@ func (sc *stateCheckpoint) SetPodResourceAllocation(podUID types.UID, alloc map[
} }
// Delete deletes allocations for specified pod // Delete deletes allocations for specified pod
func (sc *stateCheckpoint) Delete(podUID types.UID, containerName string) error { func (sc *stateCheckpoint) RemovePod(podUID types.UID) error {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.Delete(podUID, containerName) // Skip writing the checkpoint for pod deletion, since there is no side effect to
return sc.storeState() // keeping a deleted pod. Deleted pods will eventually be cleaned up by RemoveOrphanedPods.
// The deletion will be stored the next time a non-delete update is made.
return sc.cache.RemovePod(podUID)
} }
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) { func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
@ -170,7 +177,7 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ types.UID, _ map[strin
return nil return nil
} }
func (sc *noopStateCheckpoint) Delete(_ types.UID, _ string) error { func (sc *noopStateCheckpoint) RemovePod(_ types.UID) error {
return nil return nil
} }

View File

@ -19,13 +19,13 @@ package state
import ( import (
"fmt" "fmt"
"os" "os"
"path/filepath"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
) )
@ -124,6 +124,22 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
actual = newSC.GetPodResourceAllocation() actual = newSC.GetPodResourceAllocation()
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation") verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation")
checkpointPath := filepath.Join(testDir, testCheckpoint)
require.FileExists(t, checkpointPath)
require.NoError(t, os.Remove(checkpointPath)) // Remove the checkpoint file to track whether it's re-written.
// Setting the pod allocations to the same values should not re-write the checkpoint.
for podUID, alloc := range tt.args.podResourceAllocation {
require.NoError(t, originalSC.SetPodResourceAllocation(podUID, alloc))
require.NoFileExists(t, checkpointPath, "checkpoint should not be re-written")
}
// Setting a new value should update the checkpoint.
require.NoError(t, originalSC.SetPodResourceAllocation("foo-bar", map[string]v1.ResourceRequirements{
"container1": {Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}},
}))
require.FileExists(t, checkpointPath, "checkpoint should be re-written")
}) })
} }
} }
@ -138,14 +154,12 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
// prepare old checkpoint, ResizeStatusEntries is unset, // prepare old checkpoint, ResizeStatusEntries is unset,
// pretend that the old checkpoint is unaware for the field ResizeStatusEntries // pretend that the old checkpoint is unaware for the field ResizeStatusEntries
const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}` const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}`
expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{ expectedPodResourceAllocation := PodResourceAllocation{
AllocationEntries: map[types.UID]map[string]v1.ResourceRequirements{ "pod1": {
"pod1": { "container1": {
"container1": { Requests: v1.ResourceList{
Requests: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("1Ki"),
v1.ResourceCPU: resource.MustParse("1Ki"), v1.ResourceMemory: resource.MustParse("1Ki"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
}, },
}, },
}, },
@ -157,15 +171,14 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
require.NoError(t, err, "failed to create old checkpoint") require.NoError(t, err, "failed to create old checkpoint")
actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName) actualPodResourceAllocation, _, err := restoreState(sc.checkpointManager, sc.checkpointName)
require.NoError(t, err, "failed to restore state") require.NoError(t, err, "failed to restore state")
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal")
sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries) sc.cache = NewStateMemory(actualPodResourceAllocation)
actualPodResourceAllocationInfo = &PodResourceAllocationInfo{} actualPodResourceAllocation = sc.cache.GetPodResourceAllocation()
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") require.Equal(t, expectedPodResourceAllocation, actualPodResourceAllocation, "pod resource allocation info is not equal")
} }

View File

@ -79,23 +79,11 @@ func (s *stateMemory) SetPodResourceAllocation(podUID types.UID, alloc map[strin
return nil return nil
} }
func (s *stateMemory) deleteContainer(podUID types.UID, containerName string) { func (s *stateMemory) RemovePod(podUID types.UID) error {
delete(s.podAllocation[podUID], containerName)
if len(s.podAllocation[podUID]) == 0 {
delete(s.podAllocation, podUID)
}
klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID, "containerName", containerName)
}
func (s *stateMemory) Delete(podUID types.UID, containerName string) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if len(containerName) == 0 { delete(s.podAllocation, podUID)
delete(s.podAllocation, podUID) klog.V(3).InfoS("Deleted pod resource allocation", "podUID", podUID)
klog.V(3).InfoS("Deleted pod resource allocation and resize state", "podUID", podUID)
return nil
}
s.deleteContainer(podUID, containerName)
return nil return nil
} }